This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 472049d  IGNITE-14679 Schema serialization. (#349)
472049d is described below

commit 472049d6d37b59501fde65dbd14a5dcc2960e126
Author: Vladimir Ermakov <[email protected]>
AuthorDate: Thu Sep 30 11:36:45 2021 +0300

    IGNITE-14679 Schema serialization. (#349)
---
 .../ignite/internal/schema/VarlenNativeType.java   |   7 +
 .../schema/AbstractSchemaSerializer.java           | 109 +++
 .../schema/SchemaSerializer.java}                  |  44 +-
 .../marshaller/schema/SchemaSerializerImpl.java    | 743 +++++++++++++++++++++
 .../schema/serializer/AbstractSerializerTest.java  | 164 +++++
 .../internal/table/distributed/TableManager.java   |  12 +-
 6 files changed, 1052 insertions(+), 27 deletions(-)

diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
index 867ec04..32a4283 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
@@ -41,6 +41,13 @@ public class VarlenNativeType extends NativeType {
         return super.mismatch(type) || len < ((VarlenNativeType)type).len;
     }
 
+    /**
+     * @return Length of the type.
+     */
+    public int length() {
+        return len;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(VarlenNativeType.class.getSimpleName(), "name", 
spec(), "len", len);
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
new file mode 100644
index 0000000..ccbf111
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.marshaller.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+/**
+ * Schema serializer.
+ */
+public abstract class AbstractSchemaSerializer implements SchemaSerializer {
+    /** Schema serializer version. */
+    protected final short version;
+
+    /** Previous version serializer. */
+    protected final AbstractSchemaSerializer previous;
+
+    /**
+     * @param ver Serializer version.
+     * @param previous Previous version serializer.
+     */
+    protected AbstractSchemaSerializer(short ver, AbstractSchemaSerializer 
previous) {
+        this.version = ver;
+        this.previous = previous;
+    }
+
+    /**
+     * @param ver Serializer version.
+     */
+    protected AbstractSchemaSerializer(short ver) {
+        this(ver, null);
+    }
+
+    /**
+     * @return Serializer version;
+     */
+    public short getVersion() {
+        return version;
+    }
+
+    /**
+     * Serialize SchemaDescriptor object to byte array.
+     *
+     * @param desc SchemaDescriptor object.
+     * @return SchemaDescriptor byte array representation.
+     */
+    public byte[] serialize(SchemaDescriptor desc) {
+        ByteBuffer buf = ByteBuffer.allocate(size(desc));
+
+        this.writeTo(desc, buf);
+
+        return buf.array();
+    }
+
+    /**
+     * Deserialize byte array to SchemaDescriptor object.
+     *
+     * @param bytes SchemaDescriptor byte array representation.
+     * @return SchemaDescriptor object.
+     */
+    public SchemaDescriptor deserialize(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short ver = readVersion(buf);
+
+        return getSerializerByVersion(ver).readFrom(buf);
+    }
+
+    /**
+     * Gets schema serializer by version.
+     *
+     * @param ver SchemaSerializer target version.
+     * @return SchemaSerializer object.
+     * @throws IllegalArgumentException If SchemaSerializer with right version 
is not found.
+     */
+    private SchemaSerializer getSerializerByVersion(short ver) {
+        if (ver == this.version)
+            return this;
+        else if (this.previous == null)
+            throw new IllegalArgumentException("Unable to find schema 
serializer with version " + ver);
+
+        return this.previous.getSerializerByVersion(ver);
+    }
+
+    /**
+     * Reads SchemaSerializer version from byte buffer.
+     *
+     * @param buf ByteBuffer object.
+     * @return SchemaSerializer version.
+     */
+    private short readVersion(ByteBuffer buf) {
+        return buf.getShort();
+    }
+}
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
similarity index 50%
copy from 
modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
index 867ec04..b623b28 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
@@ -15,34 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema;
+package org.apache.ignite.internal.schema.marshaller.schema;
 
-import org.apache.ignite.internal.tostring.S;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 
 /**
- * Variable-length native type.
+ * SchemaDescriptor (De)Serializer interface.
  */
-public class VarlenNativeType extends NativeType {
-    /** Length of the type. */
-    private final int len;
-
+public interface SchemaSerializer {
     /**
-     * @param typeSpec Type spec.
-     * @param len Type length.
+     * Writes SchemaDescriptor object to byte buffer.
+     *
+     * @param desc SchemaDescriptor object.
+     * @param byteBuf ByteBuffer object with allocated byte array.
      */
-    protected VarlenNativeType(NativeTypeSpec typeSpec, int len) {
-        super(typeSpec);
-
-        this.len = len;
-    }
+    void writeTo(SchemaDescriptor desc, ByteBuffer byteBuf);
 
-    /** {@inheritDoc} */
-    @Override public boolean mismatch(NativeType type) {
-        return super.mismatch(type) || len < ((VarlenNativeType)type).len;
-    }
+    /**
+     * @param byteBuf Byte buffer with byte array.
+     * @return SchemaDescriptor object.
+     */
+    SchemaDescriptor readFrom(ByteBuffer byteBuf);
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VarlenNativeType.class.getSimpleName(), "name", 
spec(), "len", len);
-    }
+    /**
+     * Calculates size in bytes of SchemaDescriptor object.
+     *
+     * @param desc SchemaDescriptor object.
+     * @return size in bytes.
+     */
+    int size(SchemaDescriptor desc);
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
new file mode 100644
index 0000000..df0ef8d
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.marshaller.schema;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.BitmaskNativeType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.NumberNativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TemporalNativeType;
+import org.apache.ignite.internal.schema.VarlenNativeType;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+
+/**
+ * Serialize SchemaDescriptor object to byte array and vice versa.
+ */
+public class SchemaSerializerImpl extends AbstractSchemaSerializer {
+    /** Instance. */
+    public static final AbstractSchemaSerializer INSTANCE = new 
SchemaSerializerImpl();
+
+    /** String array length. */
+    private static final int STRING_HEADER = 4;
+
+    /** Array length. */
+    private static final int ARRAY_HEADER_LENGTH = 4;
+
+    /** Byte. */
+    private static final int BYTE = 1;
+
+    /** Short. */
+    private static final int SHORT = 2;
+
+    /** Int. */
+    private static final int INT = 4;
+
+    /** Long. */
+    private static final int LONG = 8;
+
+    /** Float. */
+    private static final int FLOAT = 4;
+
+    /** Double. */
+    private static final int DOUBLE = 8;
+
+    /** Schema version. */
+    private static final short SCHEMA_VER = 1;
+
+    /**
+     * Default constructor.
+     */
+    public SchemaSerializerImpl() {
+        super(SCHEMA_VER);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeTo(SchemaDescriptor desc, ByteBuffer byteBuf) {
+        byteBuf.putShort(SCHEMA_VER);
+        byteBuf.putInt(desc.version());
+
+        appendColumns(desc.keyColumns(), byteBuf);
+        appendColumns(desc.valueColumns(), byteBuf);
+
+        Column[] affinityCols = desc.affinityColumns();
+
+        byteBuf.putInt(affinityCols.length);
+
+        for (Column column : affinityCols)
+            appendString(column.name(), byteBuf);
+
+        appendColumnMapping(desc.columnMapping(), desc.length(), byteBuf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaDescriptor readFrom(ByteBuffer byteBuf) {
+        int ver = byteBuf.getInt();
+
+        Column[] keyCols = readColumns(byteBuf);
+        Column[] valCols = readColumns(byteBuf);
+
+        int affinityColsSize = byteBuf.getInt();
+
+        String[] affinityCols = new String[affinityColsSize];
+
+        for (int i = 0; i < affinityColsSize; i++)
+            affinityCols[i] = readString(byteBuf);
+
+        SchemaDescriptor descriptor = new SchemaDescriptor(ver, keyCols, 
affinityCols, valCols);
+
+        ColumnMapper mapper = readColumnMapping(descriptor, byteBuf);
+
+        descriptor.columnMapping(mapper);
+
+        return descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size(SchemaDescriptor desc) {
+        return SHORT +                      //Assembler version
+            INT +                          //Descriptor version
+            getColumnsSize(desc.keyColumns()) +
+            getColumnsSize(desc.valueColumns()) +
+            ARRAY_HEADER_LENGTH +          //Affinity columns length
+            getStringArraySize(desc.affinityColumns()) +
+            getColumnMappingSize(desc.columnMapping(), desc.length());
+    }
+
+    /**
+     * Gets column mapping size in bytes.
+     *
+     * @param len Column array length (both key and value columns).
+     * @return Size of column mapping.
+     */
+    private int getColumnMappingSize(ColumnMapper mapper, int len) {
+        int size = INT;
+
+        for (int i = 0; i < len; i++) {
+            if (mapper.map(i) != i) {
+                size += INT;
+                size += INT;
+
+                if (mapper.map(i) == -1)
+                    size += getColumnSize(mapper.mappedColumn(i));
+            }
+        }
+
+        return size;
+    }
+
+    /**
+     * Gets column names array size in bytes.
+     *
+     * @param cols Column array.
+     * @return Size of an array with column names.
+     */
+    private int getStringArraySize(Column[] cols) {
+        int size = ARRAY_HEADER_LENGTH;      //String array size header
+        for (Column column : cols)
+            size += getStringSize(column.name());
+
+        return size;
+    }
+
+    /**
+     * Gets columns array size in bytes.
+     *
+     * @param cols Column array.
+     * @return Size of column array, including column name and column native 
type.
+     */
+    private int getColumnsSize(Columns cols) {
+        int size = ARRAY_HEADER_LENGTH; //cols array length
+
+        for (Column column : cols.columns())
+            size += getColumnSize(column);
+
+        return size;
+    }
+
+    /**
+     * Gets column size in bytes.
+     *
+     * @param col Column object.
+     * @return Column size in bytes.
+     */
+    private int getColumnSize(Column col) {
+        return INT +                      //Schema index
+            BYTE +                         //nullable flag
+            getStringSize(col.name()) +
+            getNativeTypeSize(col.type()) +
+            BYTE + getDefaultObjectSize(col.type(), col.defaultValue());
+    }
+
+    /**
+     * Gets default object size in bytes based on object native type.
+     *
+     * @param type Column native type.
+     * @param val Object.
+     * @return Object size in bytes.
+     */
+    private int getDefaultObjectSize(NativeType type, Object val) {
+        if (val == null)
+            return 0;
+
+        switch (type.spec()) {
+            case INT8:
+                return BYTE;
+
+            case INT16:
+                return SHORT;
+
+            case INT32:
+                return INT;
+
+            case INT64:
+                return LONG;
+
+            case FLOAT:
+                return FLOAT;
+
+            case DOUBLE:
+                return DOUBLE;
+
+            case DECIMAL:
+                return INT + INT + 
((BigDecimal)val).unscaledValue().toByteArray().length;
+
+            case UUID:
+                return LONG + LONG;
+
+            case STRING:
+                return getStringSize(((String)val));
+
+            case BYTES:
+                return INT + ((byte[])val).length;
+
+            case BITMASK:
+                return INT + ((BitSet)val).toByteArray().length;
+
+            case NUMBER:
+                return INT + ((BigInteger)val).toByteArray().length;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Gets native type size in bytes.
+     *
+     * @param type Native type.
+     * @return Native type size depending on NativeTypeSpec params.
+     */
+    private int getNativeTypeSize(NativeType type) {
+        int typeSize = 0;
+
+        switch (type.spec()) {
+            case STRING:
+            case BYTES:
+            case TIME:
+            case DATETIME:
+            case TIMESTAMP:
+            case NUMBER:
+            case BITMASK:
+                typeSize += INT; //For precision, len or bits
+
+                break;
+            case DECIMAL:
+                typeSize += INT; //For precision
+                typeSize += INT; //For scale
+
+                break;
+            default:
+                break;
+        }
+
+        return getStringSize(type.spec().name()) + //native type name
+            typeSize;
+    }
+
+    /**
+     * Gets string size in bytes.
+     *
+     * @param str String.
+     * @return Byte array size.
+     */
+    private int getStringSize(String str) {
+        return STRING_HEADER + //string byte array header
+            str.getBytes().length; // string byte array length
+    }
+
+    /**
+     * Appends column mapping to byte buffer.
+     *
+     * @param mapper ColumnMapper object.
+     * @param len Column array length (both key and value columns).
+     * @param buff Allocated ByteBuffer.
+     */
+    private void appendColumnMapping(ColumnMapper mapper, int len, ByteBuffer 
buff) {
+        int mappingSize = 0;
+        for (int i = 0; i < len; i++) {
+            if (mapper.map(i) != i)
+                mappingSize += 1;
+        }
+
+        buff.putInt(mappingSize);
+
+        for (int i = 0; i < len; i++) {
+            if (mapper.map(i) != i) {
+                buff.putInt(i);
+                buff.putInt(mapper.map(i));
+
+                if (mapper.map(i) == -1)
+                    appendColumn(mapper.mappedColumn(i), buff);
+            }
+        }
+    }
+
+    /**
+     * Appends column array to byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @param cols Column array.
+     */
+    private void appendColumns(Columns cols, ByteBuffer buf) {
+        Column[] colArr = cols.columns();
+
+        buf.putInt(colArr.length);
+
+        for (Column column : colArr)
+            appendColumn(column, buf);
+    }
+
+    /**
+     * Appends column to byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @param col Column.
+     */
+    private void appendColumn(Column col, ByteBuffer buf) {
+        buf.putInt(col.schemaIndex());
+        buf.put((byte)(col.nullable() ? 1 : 0));
+
+        appendString(col.name(), buf);        
+        appendNativeType(buf, col.type());
+
+        appendDefaultValue(buf, col.type(), col.defaultValue());
+    }
+
+    /**
+     * Appends default value object to byte buffer based on native type.
+     *
+     * @param buf Allocated ByteBuffer.
+     * @param type Column native type.
+     * @param val Default object value.
+     */
+    private void appendDefaultValue(ByteBuffer buf, NativeType type, Object 
val) {
+        boolean isPresent = val != null;
+
+        buf.put((byte)(isPresent ? 1 : 0));
+
+        if (!isPresent)
+            return;
+
+        switch (type.spec()) {
+            case INT8: {
+                buf.put((byte)val);
+
+                break;
+            }
+            case INT16: {
+                buf.putShort((short)val);
+
+                break;
+            }
+            case INT32: {
+                buf.putInt((int)val);
+
+                break;
+            }
+            case INT64: {
+                buf.putLong((long)val);
+
+                break;
+            }
+            case FLOAT: {
+                buf.putFloat((float)val);
+
+                break;
+            }
+            case DOUBLE: {
+                buf.putDouble((double)val);
+
+                break;
+            }
+            case DECIMAL: {
+                BigDecimal decimal = (BigDecimal)val;
+
+                buf.putInt(decimal.scale());
+                appendByteArray(decimal.unscaledValue().toByteArray(), buf);
+
+                break;
+            }
+            case UUID: {
+                UUID uuid = (UUID)val;
+
+                buf.putLong(uuid.getMostSignificantBits());
+                buf.putLong(uuid.getLeastSignificantBits());
+
+                break;
+            }
+            case STRING: {
+                appendString((String)val, buf);
+
+                break;
+            }
+            case BYTES: {
+                appendByteArray((byte[])val, buf);
+
+                break;
+            }
+            case BITMASK: {
+                BitSet bitSet = (BitSet)val;
+                appendByteArray(bitSet.toByteArray(), buf);
+
+                break;
+            }
+            case NUMBER: {
+                BigInteger bigInt = (BigInteger)val;
+
+                appendByteArray(bigInt.toByteArray(), buf);
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * Appends native type to byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @param type Native type.
+     */
+    private void appendNativeType(ByteBuffer buf, NativeType type) {
+        appendString(type.spec().name(), buf);
+
+        switch (type.spec()) {
+            case STRING:
+            case BYTES: {
+                int len = ((VarlenNativeType)type).length();
+
+                buf.putInt(len);
+
+                break;
+            }
+            case BITMASK: {
+                int bits = ((BitmaskNativeType)type).bits();
+
+                buf.putInt(bits);
+
+                break;
+            }
+            case DECIMAL: {
+                int precision = ((DecimalNativeType)type).precision();
+                int scale = ((DecimalNativeType)type).scale();
+
+                buf.putInt(precision);
+                buf.putInt(scale);
+
+                break;
+            }
+            case TIME:
+            case DATETIME:
+            case TIMESTAMP: {
+                int precision = ((TemporalNativeType)type).precision();
+
+                buf.putInt(precision);
+
+                break;
+            }
+            case NUMBER: {
+                int precision = ((NumberNativeType)type).precision();
+
+                buf.putInt(precision);
+
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
+    /**
+     * Appends string byte representation to byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @param str String.
+     */
+    private void appendString(String str, ByteBuffer buf) {
+        appendByteArray(str.getBytes(), buf);
+    }
+
+    /**
+     * Appends byte array to byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @param bytes Byte array.
+     */
+    private void appendByteArray(byte[] bytes, ByteBuffer buf) {
+        buf.putInt(bytes.length);
+        buf.put(bytes);
+    }
+
+    /**
+     * Reads column mapping from byte buffer.
+     *
+     * @param desc SchemaDescriptor.
+     * @param buf Byte buffer.
+     * @return ColumnMapper object.
+     */
+    private ColumnMapper readColumnMapping(SchemaDescriptor desc, ByteBuffer 
buf) {
+        int mappingSize = buf.getInt();
+
+        if (mappingSize == 0)
+            return ColumnMapping.identityMapping();
+
+        ColumnMapper mapper = ColumnMapping.createMapper(desc);
+
+        for (int i = 0; i < mappingSize; i++) {
+            int from = buf.getInt();
+            int to = buf.getInt();
+
+            if (to == -1) {
+                Column col = readColumn(buf);
+                mapper.add(col);
+            } else
+                mapper.add(from, to);
+        }
+
+        return mapper;
+    }
+
+    /**
+     * Reads column array from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Column array.
+     */
+    private Column[] readColumns(ByteBuffer buf) {
+        int size = buf.getInt();
+
+        Column[] colArr = new Column[size];
+
+        for (int i = 0; i < size; i++)
+            colArr[i] = readColumn(buf);
+
+        return colArr;
+    }
+
+    /**
+     * Reads column from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Column.
+     */
+    private Column readColumn(ByteBuffer buf) {
+        int schemaIdx = buf.getInt();
+        boolean nullable = buf.get() == 1;
+        String name = readString(buf);
+
+        NativeType nativeType = fromByteBuffer(buf);
+
+        Object object = readDefaultValue(buf, nativeType);
+
+        return new Column(name, nativeType, nullable, () -> 
object).copy(schemaIdx);
+    }
+
+    /**
+     * Reads default value object or null.
+     *
+     * @param buf ByteBuffer.
+     * @param type Column native type.
+     * @return Column default value.
+     */
+    private Object readDefaultValue(ByteBuffer buf, NativeType type) {
+
+        boolean isPresent = buf.get() == 1;
+
+        if (!isPresent)
+            return null;
+
+        switch (type.spec()) {
+            case INT8:
+                return buf.get();
+
+            case INT16:
+                return buf.getShort();
+
+            case INT32:
+                return buf.getInt();
+
+            case INT64:
+                return buf.getLong();
+
+            case FLOAT:
+                return buf.getFloat();
+
+            case DOUBLE:
+                return buf.getDouble();
+
+            case DECIMAL: {
+                int scale = buf.getInt();
+                byte[] bytes = readByteArray(buf);
+
+                return new BigDecimal(new BigInteger(bytes), scale);
+            }
+            case UUID:
+                return new UUID(buf.getLong(), buf.getLong());
+
+            case STRING:
+                return readString(buf);
+
+            case BYTES:
+                return readByteArray(buf);
+
+            case BITMASK:
+                return BitSet.valueOf(readByteArray(buf));
+
+            case NUMBER:
+                return new BigInteger(readByteArray(buf));
+        }
+
+        return null;
+    }
+
+    /**
+     * Reads native type from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Native type.
+     */
+    private NativeType fromByteBuffer(ByteBuffer buf) {
+        String nativeTypeSpecName = readString(buf);
+
+        NativeTypeSpec spec = NativeTypeSpec.valueOf(nativeTypeSpecName);
+
+        switch (spec) {
+            case STRING:
+                int strLen = buf.getInt();
+
+                return NativeTypes.stringOf(strLen);
+
+            case BYTES:
+                int len = buf.getInt();
+
+                return NativeTypes.blobOf(len);
+
+            case BITMASK:
+                int bits = buf.getInt();
+
+                return NativeTypes.bitmaskOf(bits);
+
+            case DECIMAL: {
+                int precision = buf.getInt();
+                int scale = buf.getInt();
+
+                return NativeTypes.decimalOf(precision, scale);
+            }
+            case TIME: {
+                int precision = buf.getInt();
+
+                return NativeTypes.time(precision);
+            }
+            case DATETIME: {
+                int precision = buf.getInt();
+
+                return NativeTypes.datetime(precision);
+            }
+            case TIMESTAMP: {
+                int precision = buf.getInt();
+
+                return NativeTypes.timestamp(precision);
+            }
+            case NUMBER: {
+                int precision = buf.getInt();
+
+                return NativeTypes.numberOf(precision);
+            }
+            case INT8:
+                return NativeTypes.INT8;
+
+            case INT16:
+                return NativeTypes.INT16;
+
+            case INT32:
+                return NativeTypes.INT32;
+
+            case INT64:
+                return NativeTypes.INT64;
+
+            case FLOAT:
+                return NativeTypes.FLOAT;
+
+            case DOUBLE:
+                return NativeTypes.DOUBLE;
+
+            case UUID:
+                return NativeTypes.UUID;
+
+            case DATE:
+                return NativeTypes.DATE;
+        }
+
+        throw new InvalidTypeException("Unexpected type " + spec);
+    }
+
+    /**
+     * Reads string from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return String.
+     */
+    private String readString(ByteBuffer buf) {
+        return new String(readByteArray(buf));
+    }
+
+    /**
+     * Reads byte array from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Byte array.
+     */
+    private byte[] readByteArray(ByteBuffer buf) {
+        int len = buf.getInt();
+        byte[] arr = new byte[len];
+
+        buf.get(arr);
+
+        return arr;
+    }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
new file mode 100644
index 0000000..5d3fe56
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.serializer;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import 
org.apache.ignite.internal.schema.marshaller.schema.AbstractSchemaSerializer;
+import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * SchemaDescriptor (de)serializer test.
+ */
+public class AbstractSerializerTest {
+    /**
+     * (de)Serialize schema test.
+     */
+    @Test
+    public void schemaSerializeTest() {
+        AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+        SchemaDescriptor desc = new SchemaDescriptor(100500,
+            new Column[] {
+                new Column("A", NativeTypes.INT8, false),
+                new Column("B", NativeTypes.INT16, false),
+                new Column("C", NativeTypes.INT32, false),
+                new Column("D", NativeTypes.INT64, false),
+                new Column("E", NativeTypes.UUID, false),
+                new Column("F", NativeTypes.FLOAT, false),
+                new Column("G", NativeTypes.DOUBLE, false),
+                new Column("H", NativeTypes.DATE, false),
+            },
+            new Column[] {
+                new Column("A1", NativeTypes.stringOf(128), false),
+                new Column("B1", NativeTypes.numberOf(255), false),
+                new Column("C1", NativeTypes.decimalOf(128, 64), false),
+                new Column("D1", NativeTypes.bitmaskOf(256), false),
+                new Column("E1", NativeTypes.datetime(8), false),
+                new Column("F1", NativeTypes.time(8), false),
+                new Column("G1", NativeTypes.timestamp(8), true)
+            }
+        );
+
+        byte[] serialize = assembler.serialize(desc);
+
+        SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+        assertEquals(desc.version(), deserialize.version());
+
+        assertArrayEquals(desc.keyColumns().columns(), 
deserialize.keyColumns().columns());
+        assertArrayEquals(desc.valueColumns().columns(), 
deserialize.valueColumns().columns());
+        assertArrayEquals(desc.affinityColumns(), 
deserialize.affinityColumns());
+    }
+
+    /**
+     * (de)Serialize default value test.
+     */
+    @Test
+    public void defaultValueSerializeTest() {
+        AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+        SchemaDescriptor desc = new SchemaDescriptor(100500,
+            new Column[] {
+                new Column("A", NativeTypes.INT8, false, () -> (byte)1),
+                new Column("B", NativeTypes.INT16, false, () -> (short)1),
+                new Column("C", NativeTypes.INT32, false, () -> 1),
+                new Column("D", NativeTypes.INT64, false, () -> 1L),
+                new Column("E", NativeTypes.UUID, false, () -> new 
UUID(12,34)),
+                new Column("F", NativeTypes.FLOAT, false, () -> 1.0f),
+                new Column("G", NativeTypes.DOUBLE, false, () -> 1.0d),
+                new Column("H", NativeTypes.DATE, false),
+            },
+            new Column[] {
+                new Column("A1", NativeTypes.stringOf(128), false, () -> 
"test"),
+                new Column("B1", NativeTypes.numberOf(255), false, () -> 
BigInteger.TEN),
+                new Column("C1", NativeTypes.decimalOf(128, 64), false, () -> 
BigDecimal.TEN),
+                new Column("D1", NativeTypes.bitmaskOf(256), false, 
BitSet::new)
+            }
+        );
+
+        byte[] serialize = assembler.serialize(desc);
+
+        SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+        //key columns
+        assertEquals(deserialize.column("A").defaultValue(), (byte)1);
+        assertEquals(deserialize.column("B").defaultValue(), (short)1);
+        assertEquals(deserialize.column("C").defaultValue(), 1);
+        assertEquals(deserialize.column("D").defaultValue(), 1L);
+        assertEquals(deserialize.column("E").defaultValue(), new UUID(12,34));
+        assertEquals(deserialize.column("F").defaultValue(), 1.0f);
+        assertEquals(deserialize.column("G").defaultValue(), 1.0d);
+        assertNull(deserialize.column("H").defaultValue());
+
+        //value columns
+        assertEquals(deserialize.column("A1").defaultValue(), "test");
+        assertEquals(deserialize.column("B1").defaultValue(), BigInteger.TEN);
+        assertEquals(deserialize.column("C1").defaultValue(), BigDecimal.TEN);
+        assertEquals(deserialize.column("D1").defaultValue(), new BitSet());
+    }
+
+    /**
+     * (de)Serialize column mapping test.
+     */
+    @Test
+    public void columnMappingSerializeTest() {
+        AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+        SchemaDescriptor desc = new SchemaDescriptor(100500,
+            new Column[] {
+                new Column("A", NativeTypes.INT8, false, () -> (byte)1)
+            },
+            new Column[] {
+                new Column("A1", NativeTypes.stringOf(128), false, () -> 
"test"),
+                new Column("B1", NativeTypes.numberOf(255), false, () -> 
BigInteger.TEN)
+            }
+        );
+
+        ColumnMapper mapper = ColumnMapping.createMapper(desc);
+
+        mapper.add(0, 1);
+
+        Column c1 = new Column("C1", NativeTypes.stringOf(128), false, () -> 
"brandNewColumn").copy(2);
+
+        mapper.add(c1);
+
+        desc.columnMapping(mapper);
+
+        byte[] serialize = assembler.serialize(desc);
+
+        SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+        ColumnMapper mapper1 = deserialize.columnMapping();
+
+        assertEquals(1, mapper1.map(0));
+        assertEquals(c1, mapper1.mappedColumn(2));
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index eb10ede..42dda6e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.metastorage.client.Entry;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaUtils;
+import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
@@ -165,8 +166,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             @NotNull 
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
                             try {
                                 
((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaView()).
-                                    
onSchemaRegistered((SchemaDescriptor)ByteUtils.
-                                        
fromBytes(schemasCtx.newValue().schema()));
+                                    onSchemaRegistered(
+                                        
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
+                                    );
 
                                 fireEvent(TableEvent.ALTER, new 
TableEventParameters(tablesById.get(tblId)), null);
                             }
@@ -198,7 +200,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     ctx.newValue().name(),
                     
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
                     
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
-                    
(SchemaDescriptor)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).schemas().
+                    
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
                         get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
                 );
 
@@ -452,7 +454,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                         schemasCh -> schemasCh.create(
                                             
String.valueOf(INITIAL_SCHEMA_VERSION),
                                             schemaCh -> schemaCh.changeSchema(
-                                                ByteUtils.toBytes(
+                                                
SchemaSerializerImpl.INSTANCE.serialize(
                                                     
SchemaUtils.prepareSchemaDescriptor(
                                                         
((ExtendedTableView)ch).schemas().size(),
                                                         ch
@@ -539,7 +541,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                 tblCh
                                             ));
 
-                                            
schemaCh.changeSchema(ByteUtils.toBytes(descriptor));
+                                            
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
                                         }
                                     )
                             ));

Reply via email to