This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-14743-compaction
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit fae6c3aace4dde9cc49e80fa1373b1a5713c99cb
Author: Andrew Mashenkov <[email protected]>
AuthorDate: Thu Jun 24 12:39:18 2021 +0300

    Minor.
---
 .../marshaller/reflection/JavaSerializer.java      |  15 +-
 .../internal/schema/row/ExpandableByteBuf.java     |   9 -
 .../org/apache/ignite/internal/schema/row/Row.java |   1 -
 .../ignite/internal/schema/row/RowAssembler.java   | 292 ++++++++++-----------
 .../ignite/internal/table/TupleMarshallerImpl.java |  99 +++----
 5 files changed, 199 insertions(+), 217 deletions(-)

diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index 2e4f4b3..06ac189 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -85,7 +85,7 @@ public class JavaSerializer extends AbstractSerializer {
         ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), 
keyMarsh, key);
         ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), 
valMarsh, val);
 
-        return new RowAssembler(schema, keyStat.maxChunkDataSize, 
keyStat.nonNullCols, valStat.maxChunkDataSize, valStat.nonNullCols);
+        return new RowAssembler(schema, keyStat.nonNullColsSize, 
keyStat.nonNullCols, valStat.nonNullColsSize, valStat.nonNullCols);
     }
 
     /**
@@ -98,10 +98,10 @@ public class JavaSerializer extends AbstractSerializer {
      */
     private ObjectStatistic collectObjectStats(Columns cols, Marshaller marsh, 
Object obj) {
         if (obj == null || !cols.hasVarlengthColumns())
-            return new ObjectStatistic(0, cols.fixsizeMaxLen());
+            return ObjectStatistic.ZERO_VARLEN_STATISTICS;
 
         int cnt = 0;
-        int size = cols.fixsizeMaxLen();
+        int size = 0;
 
         for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) {
             final Object val = marsh.value(obj, i);
@@ -138,16 +138,19 @@ public class JavaSerializer extends AbstractSerializer {
      * Object statistic.
      */
     private static class ObjectStatistic {
+        /** Cached zero statistics. */
+        static final ObjectStatistic ZERO_VARLEN_STATISTICS = new 
ObjectStatistic(0,0);
+
         /** Non-null columns of varlen type. */
         int nonNullCols;
 
         /** Length of all non-null columns of varlen types. */
-        int maxChunkDataSize;
+        int nonNullColsSize;
 
         /** Constructor. */
-        ObjectStatistic(int nonNullCols, int maxRowSize) {
+        ObjectStatistic(int nonNullCols, int nonNullColsSize) {
             this.nonNullCols = nonNullCols;
-            this.maxChunkDataSize = maxRowSize;
+            this.nonNullColsSize = nonNullColsSize;
         }
     }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
index 74f5099..159153e 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java
@@ -280,13 +280,4 @@ public class ExpandableByteBuf {
         buf.position(oldPos);
         buf.order(ByteOrder.LITTLE_ENDIAN);
     }
-
-    /**
-     * Unwrap to ByteBuffer.
-     *
-     * @return internal buffer.
-     */
-    ByteBuffer unwrap() {
-        return buf;
-    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index c9e27d5..8e2553e 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -55,7 +55,6 @@ public class Row implements BinaryRow {
 
         this.row = row;
         this.schema = schema;
-
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
index d07f621..76ded31 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
@@ -34,8 +34,6 @@ import org.apache.ignite.internal.schema.NativeTypeSpec;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 
-import static org.apache.ignite.internal.schema.BinaryRow.KEY_CHUNK_OFFSET;
-import static 
org.apache.ignite.internal.schema.BinaryRow.KEY_HASH_FIELD_OFFSET;
 import static 
org.apache.ignite.internal.schema.BinaryRow.RowFlags.KEY_FLAGS_OFFSET;
 import static 
org.apache.ignite.internal.schema.BinaryRow.RowFlags.VAL_FLAGS_OFFSET;
 
@@ -51,41 +49,52 @@ public class RowAssembler {
     /** Schema. */
     private final SchemaDescriptor schema;
 
+    /** The number of non-null varlen columns in values chunk. */
+    private final int varVartblLen;
+
     /** Target byte buffer to write to. */
     private final ExpandableByteBuf buf;
 
-    /** The number of non-null varlen columns in values chunk. */
-    private final int valVarlenCols;
-
     /** Current columns chunk. */
     private Columns curCols;
 
     /** Current field index (the field is unset). */
     private int curCol;
 
-    /** Hash. */
-    private int hash;
-
-    /** Flags. */
-    private short flags;
+    /** Current offset for the next column to be appended. */
+    private int curOff;
 
-    /** Charset encoder for strings. Initialized lazily. */
-    private CharsetEncoder strEncoder;
+    /** Index of the current varlen table entry. Incremented each time 
non-null varlen column is appended. */
+    private int curVartblEntry;
 
-    /** Base offset of the chunk */
+    /** Base offset of the current chunk */
     private int baseOff;
 
-    /** Offset of the varlen table for the chunk. */
+    /** Offset of the null-map for current chunk. */
+    private int nullMapOff;
+
+    /** Offset of the varlen table for current chunk. */
     private int varTblOff;
 
-    /** Offset of data for the chunk. */
+    /** Offset of data for current chunk. */
     private int dataOff;
 
-    /** Index of the current varlen table entry. Incremented each time 
non-null varlen column is appended. */
-    protected int curVartblEntry;
+    /** Row hashcode. */
+    private int keyHash;
 
-    /** Current offset for the next column to be appended. */
-    protected int curOff;
+    /** Flags. */
+    private short flags;
+
+    /** Charset encoder for strings. Initialized lazily. */
+    private CharsetEncoder strEncoder;
+
+    /**
+     * @param entries Number of non-null varlen columns.
+     * @return Total size of the varlen table.
+     */
+    private static int varTableChunkLength(int entries, int entrySize) {
+        return entries <= 1 ? 0 : Short.BYTES + (entries - 1) * entrySize;
+    }
 
     /**
      * Calculates encoded string length.
@@ -142,41 +151,40 @@ public class RowAssembler {
      * RowAssembler will apply optimizations based on chunks sizes estimations.
      *
      * @param schema Row schema.
-     * @param keyDataSize Key payload size. Estimated upper-bound or zero if 
unknown.
+     * @param keyVarlenSize Key payload size. Estimated upper-bound or zero if 
unknown.
      * @param keyVarlenCols Number of non-null varlen columns in key chunk.
-     * @param valDataSize Value data size. Estimated upper-bound or zero if 
unknown.
+     * @param valVarlenSize Value data size. Estimated upper-bound or zero if 
unknown.
      * @param valVarlenCols Number of non-null varlen columns in value chunk.
      */
     public RowAssembler(
         SchemaDescriptor schema,
-        int keyDataSize,
+        int keyVarlenSize,
         int keyVarlenCols,
-        int valDataSize,
+        int valVarlenSize,
         int valVarlenCols
     ) {
         this.schema = schema;
-        this.valVarlenCols = valVarlenCols;
 
         curCols = schema.keyColumns();
         curCol = 0;
-        hash = 0;
+        keyHash = 0;
         strEncoder = null;
 
-        final int keyNullMapSize = schema.keyColumns().nullMapSize();
-        final int valNullMapSize = schema.valueColumns().nullMapSize();
+        int keyVartblLen = varTableChunkLength(keyVarlenCols, Integer.BYTES);
+        varVartblLen = varTableChunkLength(valVarlenCols, Integer.BYTES);
 
-        int size = BinaryRow.HEADER_SIZE +
-            +keyDataSize + keyNullMapSize + varTableLength(keyVarlenCols, 
Integer.BYTES) +
-            valDataSize + valNullMapSize + varTableLength(valVarlenCols, 
Integer.BYTES);
+        initChunk(BinaryRow.KEY_CHUNK_OFFSET, curCols.nullMapSize(), 
keyVartblLen);
 
-        buf = new ExpandableByteBuf(size);
-        buf.putShort(0, (short)schema.version());
+        final Columns valCols = schema.valueColumns();
 
-        initChunk(KEY_CHUNK_OFFSET, keyNullMapSize, keyVarlenCols);
-    }
+        int size = BinaryRow.HEADER_SIZE + 2 * BinaryRow.CHUNK_LEN_FLD_SIZE +
+            keyVarlenSize + valVarlenSize +
+            keyVartblLen + varVartblLen +
+            curCols.fixsizeMaxLen() + valCols.fixsizeMaxLen() +
+            curCols.nullMapSize() + valCols.nullMapSize();
 
-    private int varTableLength(int entries, int entrySize) {
-        return entries <= 1 ? 0 : Short.BYTES + entries * entrySize;
+        buf = new ExpandableByteBuf(size);
+        buf.putShort(0, (short)schema.version());
     }
 
     /**
@@ -185,39 +193,20 @@ public class RowAssembler {
      * @return {@code this} for chaining.
      */
     public RowAssembler appendNull() {
-        Column col = curCols.column(curCol);
+        if (!curCols.column(curCol).nullable())
+            throw new IllegalArgumentException("Failed to set column (null was 
passed, but column is not nullable): " + curCols.column(curCol));
 
-        if (!col.nullable())
-            throw new IllegalArgumentException("Failed to set column (null was 
passed, but column is not nullable): " +
-                col);
+        setNull(curCol);
 
         if (isKeyColumn())
-            hash = 31 * hash;
-
-        setNull(curCol);
+            keyHash *= 31;
 
-        shiftColumn();
+        shiftColumn(0);
 
         return this;
     }
 
     /**
-     * Sets null flag in the null-map for the given column.
-     *
-     * @param colIdx Column index.
-     */
-    private void setNull(int colIdx) {
-        assert nullmapOff() < varTblOff : "Null-map is omitted.";
-
-        int byteInMap = colIdx >> 3; // Equivalent expression for: colIidx / 8
-        int bitInByte = colIdx & 7; // Equivalent expression for: colIdx % 8
-
-        buf.ensureCapacity(nullmapOff() + byteInMap + 1);
-
-        buf.put(nullmapOff() + byteInMap, (byte)(buf.get(nullmapOff() + 
byteInMap) | (1 << bitInByte)));
-    }
-
-    /**
      * Appends byte value for the current column to the chunk.
      *
      * @param val Column value.
@@ -226,14 +215,12 @@ public class RowAssembler {
     public RowAssembler appendByte(byte val) {
         checkType(NativeTypes.BYTE);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Byte.hashCode(val);
-
         buf.put(curOff, val);
 
-        curOff += NativeTypes.BYTE.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Byte.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.BYTE.sizeInBytes());
 
         return this;
     }
@@ -247,14 +234,12 @@ public class RowAssembler {
     public RowAssembler appendShort(short val) {
         checkType(NativeTypes.SHORT);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Short.hashCode(val);
-
         buf.putShort(curOff, val);
 
-        curOff += NativeTypes.SHORT.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Short.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.SHORT.sizeInBytes());
 
         return this;
     }
@@ -268,14 +253,12 @@ public class RowAssembler {
     public RowAssembler appendInt(int val) {
         checkType(NativeTypes.INTEGER);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Integer.hashCode(val);
-
         buf.putInt(curOff, val);
 
-        curOff += NativeTypes.INTEGER.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Integer.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.INTEGER.sizeInBytes());
 
         return this;
     }
@@ -289,14 +272,12 @@ public class RowAssembler {
     public RowAssembler appendLong(long val) {
         checkType(NativeTypes.LONG);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Long.hashCode(val);
-
         buf.putLong(curOff, val);
 
-        curOff += NativeTypes.LONG.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Long.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.LONG.sizeInBytes());
 
         return this;
     }
@@ -310,14 +291,12 @@ public class RowAssembler {
     public RowAssembler appendFloat(float val) {
         checkType(NativeTypes.FLOAT);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Float.hashCode(val);
-
         buf.putFloat(curOff, val);
 
-        curOff += NativeTypes.FLOAT.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Float.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.FLOAT.sizeInBytes());
 
         return this;
     }
@@ -331,14 +310,12 @@ public class RowAssembler {
     public RowAssembler appendDouble(double val) {
         checkType(NativeTypes.DOUBLE);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Double.hashCode(val);
-
         buf.putDouble(curOff, val);
 
-        curOff += NativeTypes.DOUBLE.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Double.hashCode(val);
 
-        shiftColumn();
+        shiftColumn(NativeTypes.DOUBLE.sizeInBytes());
 
         return this;
     }
@@ -352,15 +329,13 @@ public class RowAssembler {
     public RowAssembler appendUuid(UUID uuid) {
         checkType(NativeTypes.UUID);
 
-        if (isKeyColumn())
-            hash = 31 * hash + uuid.hashCode();
-
         buf.putLong(curOff, uuid.getLeastSignificantBits());
         buf.putLong(curOff + 8, uuid.getMostSignificantBits());
 
-        curOff += NativeTypes.UUID.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + uuid.hashCode();
 
-        shiftColumn();
+        shiftColumn(NativeTypes.UUID.sizeInBytes());
 
         return this;
     }
@@ -374,23 +349,23 @@ public class RowAssembler {
     public RowAssembler appendString(String val) {
         checkType(NativeTypes.STRING);
 
-        if (isKeyColumn())
-            hash = 31 * hash + val.hashCode();
-
         try {
             int written = buf.putString(curOff, val, encoder());
 
             writeVarlenOffset(curVartblEntry, curOff - dataOff);
 
+            if (isKeyColumn())
+                keyHash = 31 * keyHash + val.hashCode();
+
             curVartblEntry++;
-            curOff += written;
+
+            shiftColumn(written);
+
+            return this;
         }
         catch (CharacterCodingException e) {
             throw new AssemblyException("Failed to encode string", e);
         }
-        shiftColumn();
-
-        return this;
     }
 
     /**
@@ -402,17 +377,16 @@ public class RowAssembler {
     public RowAssembler appendBytes(byte[] val) {
         checkType(NativeTypes.BYTES);
 
-        if (isKeyColumn())
-            hash = 31 * hash + Arrays.hashCode(val);
-
         buf.putBytes(curOff, val);
 
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + Arrays.hashCode(val);
+
         writeVarlenOffset(curVartblEntry, curOff - dataOff);
 
         curVartblEntry++;
-        curOff += val.length;
 
-        shiftColumn();
+        shiftColumn(val.length);
 
         return this;
     }
@@ -434,9 +408,6 @@ public class RowAssembler {
             throw new IllegalArgumentException("Failed to set bitmask for 
column '" + col.name() + "' " +
                 "(mask size exceeds allocated size) [mask=" + bitSet + ", 
maxSize=" + maskType.bits() + "]");
 
-        if (isKeyColumn())
-            hash = 31 * hash + bitSet.hashCode();
-
         byte[] arr = bitSet.toByteArray();
 
         buf.putBytes(curOff, arr);
@@ -444,9 +415,10 @@ public class RowAssembler {
         for (int i = 0; i < maskType.sizeInBytes() - arr.length; i++)
             buf.put(curOff + arr.length + i, (byte)0);
 
-        curOff += maskType.sizeInBytes();
+        if (isKeyColumn())
+            keyHash = 31 * keyHash + bitSet.hashCode();
 
-        shiftColumn();
+        shiftColumn(maskType.sizeInBytes());
 
         return this;
     }
@@ -459,7 +431,7 @@ public class RowAssembler {
             throw new AssemblyException("Key column missed: colIdx=" + curCol);
         else {
             if (curCol == 0) {
-                flags &= ~(RowFlags.CHUNK_FLAGS_MASK << VAL_FLAGS_OFFSET);
+//                flags &= ~(RowFlags.CHUNK_FLAGS_MASK << VAL_FLAGS_OFFSET);
                 flags |= RowFlags.NO_VALUE_FLAG;
             }
             else if (schema.valueColumns().length() != curCol)
@@ -467,19 +439,12 @@ public class RowAssembler {
         }
 
         buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags);
-        buf.putInt(KEY_HASH_FIELD_OFFSET, hash);
+        buf.putInt(BinaryRow.KEY_HASH_FIELD_OFFSET, keyHash);
 
         return buf.toArray();
     }
 
     /**
-     * @return {@code true} if current column is a key column, {@code false} 
otherwise.
-     */
-    private boolean isKeyColumn() {
-        return curCols == schema.keyColumns();
-    }
-
-    /**
      * @return UTF-8 string encoder.
      */
     private CharsetEncoder encoder() {
@@ -490,6 +455,19 @@ public class RowAssembler {
     }
 
     /**
+     * Writes the given offset to the varlen table entry with the given index.
+     *
+     * @param entryIdx Vartable entry index.
+     * @param off Offset to write.
+     */
+    private void writeVarlenOffset(int entryIdx, int off) {
+        if (entryIdx == 0)
+            return; // Omit offset for very first varlen.
+
+        buf.putInt(varTblOff + Short.BYTES + (entryIdx - 1) * Integer.BYTES, 
off);
+    }
+
+    /**
      * Checks that the type being appended matches the column type.
      *
      * @param type Type spec that is attempted to be appended.
@@ -512,23 +490,39 @@ public class RowAssembler {
     }
 
     /**
+     * Sets null flag in the null-map for the given column.
+     *
+     * @param colIdx Column index.
+     */
+    private void setNull(int colIdx) {
+//        assert nullMapOff < varTblOff : "Null-map is omitted.";
+
+        int byteInMap = colIdx / 8;// >> 3; // Equivalent expression for: 
colIidx / 8
+        int bitInByte = colIdx % 8; //& 7; // Equivalent expression for: 
colIdx % 8
+
+        buf.ensureCapacity(nullMapOff + byteInMap + 1);
+
+        buf.put(nullMapOff + byteInMap, (byte)(buf.get(nullMapOff + byteInMap) 
| (1 << bitInByte)));
+    }
+
+    /**
      * Shifts current column indexes as necessary, also
      * switch to value chunk writer when moving from key to value columns.
      */
-    private void shiftColumn() {
+    private void shiftColumn(int size) {
         curCol++;
+        curOff += size;
 
         if (curCol == curCols.length()) {
             // Write sizes.
-            final int size = curOff - baseOff;
+            final int chunkLen = curOff - baseOff;
 
-            buf.putInt(baseOff, size);
+            buf.putInt(baseOff, chunkLen);
 
             if (curVartblEntry > 1) {
-                assert varTblOff < dataOff :
-                    "Illegal writing of varlen when 'omit vartable' flag is 
set for a chunk.";
-
-                assert varTblOff + varTableLength(curVartblEntry - 1, 
Integer.BYTES) == dataOff : "Vartable overlow: size=" + curVartblEntry;
+//                assert varTblOff < dataOff : "Illegal writing of varlen when 
'omit vartable' flag is set for a chunk.";
+//
+//                assert varTblOff + varTableLength(curVartblEntry, 
Integer.BYTES) == dataOff : "Vartable overlow: size=" + curVartblEntry;
 
                 buf.putShort(varTblOff, (short)(curVartblEntry - 1));
             }
@@ -538,49 +532,28 @@ public class RowAssembler {
 
             // Switch key->value columns.
             curCols = schema.valueColumns();
-            curCol = 0;
 
             // Create value chunk writer.
-            initChunk(BinaryRow.HEADER_SIZE + size/* Key chunk size */, 
curCols.nullMapSize(), valVarlenCols);
+            initChunk(BinaryRow.HEADER_SIZE + chunkLen/* Key chunk size */, 
curCols.nullMapSize(), varVartblLen);
         }
     }
 
     /**
-     * @return Null-map offset.
-     */
-    private int nullmapOff() {
-        return baseOff + BinaryRow.CHUNK_LEN_FLD_SIZE;
-    }
-
-    /**
-     * Writes the given offset to the varlen table entry with the given index.
-     *
-     * @param entryIdx Vartable entry index.
-     * @param off Offset to write.
-     */
-    protected void writeVarlenOffset(int entryIdx, int off) {
-        if (entryIdx == 0)
-            return; // Omit offset for very first varlen.
-
-        buf.putInt(varTblOff + Short.BYTES + (entryIdx - 1) * Integer.BYTES, 
off);
-    }
-
-    /**
      * Init chunk offsets and flags.
      *
      * @param baseOff Chunk base offset.
      * @param nullMapLen Null-map length in bytes.
-     * @param vartblEntries Vartable entries.
+     * @param vartblLen Vartable length in bytes.
      */
-    private void initChunk(int baseOff, int nullMapLen, int vartblEntries) {
+    private void initChunk(int baseOff, int nullMapLen, int vartblLen) {
         this.baseOff = baseOff;
 
-        final int vartblLen = varTableLength(vartblEntries, Integer.BYTES);
-
-        varTblOff = nullmapOff() + nullMapLen;
+        nullMapOff = baseOff + BinaryRow.CHUNK_LEN_FLD_SIZE;
+        varTblOff = nullMapOff + nullMapLen;
         dataOff = varTblOff + vartblLen;
         curOff = dataOff;
         curVartblEntry = 0;
+        curCol = 0;
 
         int flags = 0;
 
@@ -590,6 +563,13 @@ public class RowAssembler {
         if (vartblLen == 0)
             flags |= VarTableFormat.OMIT_VARTBL_FLAG;
 
-        this.flags |= flags << (baseOff == KEY_CHUNK_OFFSET ? KEY_FLAGS_OFFSET 
: VAL_FLAGS_OFFSET);
+        this.flags |= flags << (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? 
KEY_FLAGS_OFFSET : VAL_FLAGS_OFFSET);
+    }
+
+    /**
+     * @return {@code true} if current column is a key column, {@code false} 
otherwise.
+     */
+    private boolean isKeyColumn() {
+        return baseOff == BinaryRow.KEY_CHUNK_OFFSET;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index dc0b59a..dbca237 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -62,10 +62,7 @@ public class TupleMarshallerImpl implements TupleMarshaller {
 
         validate(keyTuple, schema.keyColumns());
 
-        TupleStatistics keyStat = tupleStatistics(schema.keyColumns(), 
keyTuple);
-        TupleStatistics valStat = tupleStatistics(schema.valueColumns(), 
valTuple);
-
-        final RowAssembler rowBuilder = createAssembler(schema, keyStat, 
valStat);
+        final RowAssembler rowBuilder = createAssembler(schema, keyTuple, 
valTuple);
 
         for (int i = 0; i < schema.keyColumns().length(); i++) {
             final Column col = schema.keyColumns().column(i);
@@ -87,39 +84,6 @@ public class TupleMarshallerImpl implements TupleMarshaller {
     }
 
     /**
-     * Analyze given tuple and gather statistics.
-     *
-     * @param cols Columns which statistics is calculated for.
-     * @param tup Tuple to analyze.
-     * @return Tuple statistics.
-     */
-    private TupleStatistics tupleStatistics(Columns cols, Tuple tup) {
-        if (tup == null)
-            return new TupleStatistics();
-
-        TupleStatistics chunk = new TupleStatistics();
-
-        chunk.payloadLen = cols.fixsizeMaxLen();
-
-        if (!cols.hasVarlengthColumns())
-            return chunk;
-
-        for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) {
-            Column col = cols.column(i);
-
-            Object val = (tup.contains(col.name())) ? tup.value(col.name()) : 
col.defaultValue();
-
-            if (val == null)
-                continue;
-
-            chunk.nonNullVarlen++;
-            chunk.payloadLen += getValueSize(val, col.type());
-        }
-
-        return chunk;
-    }
-
-    /**
      * Validates columns values.
      *
      * @param tuple Tuple to validate.
@@ -143,16 +107,19 @@ public class TupleMarshallerImpl implements 
TupleMarshaller {
     /**
      * Creates {@link RowAssembler} for key-value tuples.
      *
-     * @param keyStat Key tuple statistics.
-     * @param valStat Value tuple statistics.
+     * @param keyTuple Key tuple.
+     * @param valTuple Value tuple.
      * @return Row assembler.
      */
-    private RowAssembler createAssembler(SchemaDescriptor schema, 
TupleStatistics keyStat, TupleStatistics valStat) {
+    private RowAssembler createAssembler(SchemaDescriptor schema, Tuple 
keyTuple, Tuple valTuple) {
+        TupleStatistics keyStat = tupleStatistics(schema.keyColumns(), 
keyTuple);
+        TupleStatistics valStat = tupleStatistics(schema.valueColumns(), 
valTuple);
+
         return new RowAssembler(
             schema,
-            keyStat.payloadLen,
+            keyStat.nonNullVarLenSize,
             keyStat.nonNullVarlen,
-            valStat.payloadLen,
+            valStat.nonNullVarLenSize,
             valStat.nonNullVarlen);
     }
 
@@ -162,7 +129,12 @@ public class TupleMarshallerImpl implements 
TupleMarshaller {
      * @param tup Tuple.
      */
     private void writeColumn(RowAssembler rowAsm, Column col, Tuple tup) {
-        Object val = tup.contains(col.name()) ? tup.value(col.name()) : 
col.defaultValue();
+        Object val;
+
+        if (!tup.contains(col.name()))
+            val = col.defaultValue();
+        else
+            val = tup.value(col.name());
 
         if (val == null) {
             rowAsm.appendNull();
@@ -227,13 +199,50 @@ public class TupleMarshallerImpl implements 
TupleMarshaller {
     }
 
     /**
+     * Analyze given tuple and gather statistics.
+     *
+     * @param cols Columns which statistics is calculated for.
+     * @param tup Tuple to analyze.
+     * @return Tuple statistics.
+     */
+    private TupleStatistics tupleStatistics(Columns cols, Tuple tup) {
+        if (tup == null || !cols.hasVarlengthColumns())
+            return TupleStatistics.ZERO_VARLEN_STATISTICS;
+
+        int cnt = 0;
+        int size = 0;
+
+        for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) {
+            Column col = cols.column(i);
+
+            final Object val = tup.contains(col.name()) ? 
tup.value(col.name()) : col.defaultValue();
+
+            if (val == null)
+                continue;
+
+            size += getValueSize(val, col.type());
+            cnt++;
+        }
+
+        return new TupleStatistics(cnt, size);
+    }
+
+    /**
      * Tuple statistics record.
      */
     private static class TupleStatistics {
-        /** Payload length in bytes. */
-        int payloadLen;
+        /** Cached zero statistics. */
+        static final TupleStatistics ZERO_VARLEN_STATISTICS = new 
TupleStatistics(0,0);
 
         /** Number of non-null varlen columns. */
         int nonNullVarlen;
+
+        /** Length of all non-null fields of varlen types. */
+        int nonNullVarLenSize;
+
+        public TupleStatistics(int nonNullVarlen, int nonNullVarLenSize) {
+            this.nonNullVarlen = nonNullVarlen;
+            this.nonNullVarLenSize = nonNullVarLenSize;
+        }
     }
 }

Reply via email to