This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-14743-compaction in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit fae6c3aace4dde9cc49e80fa1373b1a5713c99cb Author: Andrew Mashenkov <[email protected]> AuthorDate: Thu Jun 24 12:39:18 2021 +0300 Minor. --- .../marshaller/reflection/JavaSerializer.java | 15 +- .../internal/schema/row/ExpandableByteBuf.java | 9 - .../org/apache/ignite/internal/schema/row/Row.java | 1 - .../ignite/internal/schema/row/RowAssembler.java | 292 ++++++++++----------- .../ignite/internal/table/TupleMarshallerImpl.java | 99 +++---- 5 files changed, 199 insertions(+), 217 deletions(-) diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java index 2e4f4b3..06ac189 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java @@ -85,7 +85,7 @@ public class JavaSerializer extends AbstractSerializer { ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), keyMarsh, key); ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), valMarsh, val); - return new RowAssembler(schema, keyStat.maxChunkDataSize, keyStat.nonNullCols, valStat.maxChunkDataSize, valStat.nonNullCols); + return new RowAssembler(schema, keyStat.nonNullColsSize, keyStat.nonNullCols, valStat.nonNullColsSize, valStat.nonNullCols); } /** @@ -98,10 +98,10 @@ public class JavaSerializer extends AbstractSerializer { */ private ObjectStatistic collectObjectStats(Columns cols, Marshaller marsh, Object obj) { if (obj == null || !cols.hasVarlengthColumns()) - return new ObjectStatistic(0, cols.fixsizeMaxLen()); + return ObjectStatistic.ZERO_VARLEN_STATISTICS; int cnt = 0; - int size = cols.fixsizeMaxLen(); + int size = 0; for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) { final Object val = marsh.value(obj, i); @@ -138,16 +138,19 @@ public class JavaSerializer extends AbstractSerializer { * Object statistic. */ private static class ObjectStatistic { + /** Cached zero statistics. */ + static final ObjectStatistic ZERO_VARLEN_STATISTICS = new ObjectStatistic(0,0); + /** Non-null columns of varlen type. */ int nonNullCols; /** Length of all non-null columns of varlen types. */ - int maxChunkDataSize; + int nonNullColsSize; /** Constructor. */ - ObjectStatistic(int nonNullCols, int maxRowSize) { + ObjectStatistic(int nonNullCols, int nonNullColsSize) { this.nonNullCols = nonNullCols; - this.maxChunkDataSize = maxRowSize; + this.nonNullColsSize = nonNullColsSize; } } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java index 74f5099..159153e 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/ExpandableByteBuf.java @@ -280,13 +280,4 @@ public class ExpandableByteBuf { buf.position(oldPos); buf.order(ByteOrder.LITTLE_ENDIAN); } - - /** - * Unwrap to ByteBuffer. - * - * @return internal buffer. - */ - ByteBuffer unwrap() { - return buf; - } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java index c9e27d5..8e2553e 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java @@ -55,7 +55,6 @@ public class Row implements BinaryRow { this.row = row; this.schema = schema; - } /** diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java index d07f621..76ded31 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java @@ -34,8 +34,6 @@ import org.apache.ignite.internal.schema.NativeTypeSpec; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; -import static org.apache.ignite.internal.schema.BinaryRow.KEY_CHUNK_OFFSET; -import static org.apache.ignite.internal.schema.BinaryRow.KEY_HASH_FIELD_OFFSET; import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.KEY_FLAGS_OFFSET; import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.VAL_FLAGS_OFFSET; @@ -51,41 +49,52 @@ public class RowAssembler { /** Schema. */ private final SchemaDescriptor schema; + /** The number of non-null varlen columns in values chunk. */ + private final int varVartblLen; + /** Target byte buffer to write to. */ private final ExpandableByteBuf buf; - /** The number of non-null varlen columns in values chunk. */ - private final int valVarlenCols; - /** Current columns chunk. */ private Columns curCols; /** Current field index (the field is unset). */ private int curCol; - /** Hash. */ - private int hash; - - /** Flags. */ - private short flags; + /** Current offset for the next column to be appended. */ + private int curOff; - /** Charset encoder for strings. Initialized lazily. */ - private CharsetEncoder strEncoder; + /** Index of the current varlen table entry. Incremented each time non-null varlen column is appended. */ + private int curVartblEntry; - /** Base offset of the chunk */ + /** Base offset of the current chunk */ private int baseOff; - /** Offset of the varlen table for the chunk. */ + /** Offset of the null-map for current chunk. */ + private int nullMapOff; + + /** Offset of the varlen table for current chunk. */ private int varTblOff; - /** Offset of data for the chunk. */ + /** Offset of data for current chunk. */ private int dataOff; - /** Index of the current varlen table entry. Incremented each time non-null varlen column is appended. */ - protected int curVartblEntry; + /** Row hashcode. */ + private int keyHash; - /** Current offset for the next column to be appended. */ - protected int curOff; + /** Flags. */ + private short flags; + + /** Charset encoder for strings. Initialized lazily. */ + private CharsetEncoder strEncoder; + + /** + * @param entries Number of non-null varlen columns. + * @return Total size of the varlen table. + */ + private static int varTableChunkLength(int entries, int entrySize) { + return entries <= 1 ? 0 : Short.BYTES + (entries - 1) * entrySize; + } /** * Calculates encoded string length. @@ -142,41 +151,40 @@ public class RowAssembler { * RowAssembler will apply optimizations based on chunks sizes estimations. * * @param schema Row schema. - * @param keyDataSize Key payload size. Estimated upper-bound or zero if unknown. + * @param keyVarlenSize Key payload size. Estimated upper-bound or zero if unknown. * @param keyVarlenCols Number of non-null varlen columns in key chunk. - * @param valDataSize Value data size. Estimated upper-bound or zero if unknown. + * @param valVarlenSize Value data size. Estimated upper-bound or zero if unknown. * @param valVarlenCols Number of non-null varlen columns in value chunk. */ public RowAssembler( SchemaDescriptor schema, - int keyDataSize, + int keyVarlenSize, int keyVarlenCols, - int valDataSize, + int valVarlenSize, int valVarlenCols ) { this.schema = schema; - this.valVarlenCols = valVarlenCols; curCols = schema.keyColumns(); curCol = 0; - hash = 0; + keyHash = 0; strEncoder = null; - final int keyNullMapSize = schema.keyColumns().nullMapSize(); - final int valNullMapSize = schema.valueColumns().nullMapSize(); + int keyVartblLen = varTableChunkLength(keyVarlenCols, Integer.BYTES); + varVartblLen = varTableChunkLength(valVarlenCols, Integer.BYTES); - int size = BinaryRow.HEADER_SIZE + - +keyDataSize + keyNullMapSize + varTableLength(keyVarlenCols, Integer.BYTES) + - valDataSize + valNullMapSize + varTableLength(valVarlenCols, Integer.BYTES); + initChunk(BinaryRow.KEY_CHUNK_OFFSET, curCols.nullMapSize(), keyVartblLen); - buf = new ExpandableByteBuf(size); - buf.putShort(0, (short)schema.version()); + final Columns valCols = schema.valueColumns(); - initChunk(KEY_CHUNK_OFFSET, keyNullMapSize, keyVarlenCols); - } + int size = BinaryRow.HEADER_SIZE + 2 * BinaryRow.CHUNK_LEN_FLD_SIZE + + keyVarlenSize + valVarlenSize + + keyVartblLen + varVartblLen + + curCols.fixsizeMaxLen() + valCols.fixsizeMaxLen() + + curCols.nullMapSize() + valCols.nullMapSize(); - private int varTableLength(int entries, int entrySize) { - return entries <= 1 ? 0 : Short.BYTES + entries * entrySize; + buf = new ExpandableByteBuf(size); + buf.putShort(0, (short)schema.version()); } /** @@ -185,39 +193,20 @@ public class RowAssembler { * @return {@code this} for chaining. */ public RowAssembler appendNull() { - Column col = curCols.column(curCol); + if (!curCols.column(curCol).nullable()) + throw new IllegalArgumentException("Failed to set column (null was passed, but column is not nullable): " + curCols.column(curCol)); - if (!col.nullable()) - throw new IllegalArgumentException("Failed to set column (null was passed, but column is not nullable): " + - col); + setNull(curCol); if (isKeyColumn()) - hash = 31 * hash; - - setNull(curCol); + keyHash *= 31; - shiftColumn(); + shiftColumn(0); return this; } /** - * Sets null flag in the null-map for the given column. - * - * @param colIdx Column index. - */ - private void setNull(int colIdx) { - assert nullmapOff() < varTblOff : "Null-map is omitted."; - - int byteInMap = colIdx >> 3; // Equivalent expression for: colIidx / 8 - int bitInByte = colIdx & 7; // Equivalent expression for: colIdx % 8 - - buf.ensureCapacity(nullmapOff() + byteInMap + 1); - - buf.put(nullmapOff() + byteInMap, (byte)(buf.get(nullmapOff() + byteInMap) | (1 << bitInByte))); - } - - /** * Appends byte value for the current column to the chunk. * * @param val Column value. @@ -226,14 +215,12 @@ public class RowAssembler { public RowAssembler appendByte(byte val) { checkType(NativeTypes.BYTE); - if (isKeyColumn()) - hash = 31 * hash + Byte.hashCode(val); - buf.put(curOff, val); - curOff += NativeTypes.BYTE.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Byte.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.BYTE.sizeInBytes()); return this; } @@ -247,14 +234,12 @@ public class RowAssembler { public RowAssembler appendShort(short val) { checkType(NativeTypes.SHORT); - if (isKeyColumn()) - hash = 31 * hash + Short.hashCode(val); - buf.putShort(curOff, val); - curOff += NativeTypes.SHORT.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Short.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.SHORT.sizeInBytes()); return this; } @@ -268,14 +253,12 @@ public class RowAssembler { public RowAssembler appendInt(int val) { checkType(NativeTypes.INTEGER); - if (isKeyColumn()) - hash = 31 * hash + Integer.hashCode(val); - buf.putInt(curOff, val); - curOff += NativeTypes.INTEGER.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Integer.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.INTEGER.sizeInBytes()); return this; } @@ -289,14 +272,12 @@ public class RowAssembler { public RowAssembler appendLong(long val) { checkType(NativeTypes.LONG); - if (isKeyColumn()) - hash = 31 * hash + Long.hashCode(val); - buf.putLong(curOff, val); - curOff += NativeTypes.LONG.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Long.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.LONG.sizeInBytes()); return this; } @@ -310,14 +291,12 @@ public class RowAssembler { public RowAssembler appendFloat(float val) { checkType(NativeTypes.FLOAT); - if (isKeyColumn()) - hash = 31 * hash + Float.hashCode(val); - buf.putFloat(curOff, val); - curOff += NativeTypes.FLOAT.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Float.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.FLOAT.sizeInBytes()); return this; } @@ -331,14 +310,12 @@ public class RowAssembler { public RowAssembler appendDouble(double val) { checkType(NativeTypes.DOUBLE); - if (isKeyColumn()) - hash = 31 * hash + Double.hashCode(val); - buf.putDouble(curOff, val); - curOff += NativeTypes.DOUBLE.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + Double.hashCode(val); - shiftColumn(); + shiftColumn(NativeTypes.DOUBLE.sizeInBytes()); return this; } @@ -352,15 +329,13 @@ public class RowAssembler { public RowAssembler appendUuid(UUID uuid) { checkType(NativeTypes.UUID); - if (isKeyColumn()) - hash = 31 * hash + uuid.hashCode(); - buf.putLong(curOff, uuid.getLeastSignificantBits()); buf.putLong(curOff + 8, uuid.getMostSignificantBits()); - curOff += NativeTypes.UUID.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + uuid.hashCode(); - shiftColumn(); + shiftColumn(NativeTypes.UUID.sizeInBytes()); return this; } @@ -374,23 +349,23 @@ public class RowAssembler { public RowAssembler appendString(String val) { checkType(NativeTypes.STRING); - if (isKeyColumn()) - hash = 31 * hash + val.hashCode(); - try { int written = buf.putString(curOff, val, encoder()); writeVarlenOffset(curVartblEntry, curOff - dataOff); + if (isKeyColumn()) + keyHash = 31 * keyHash + val.hashCode(); + curVartblEntry++; - curOff += written; + + shiftColumn(written); + + return this; } catch (CharacterCodingException e) { throw new AssemblyException("Failed to encode string", e); } - shiftColumn(); - - return this; } /** @@ -402,17 +377,16 @@ public class RowAssembler { public RowAssembler appendBytes(byte[] val) { checkType(NativeTypes.BYTES); - if (isKeyColumn()) - hash = 31 * hash + Arrays.hashCode(val); - buf.putBytes(curOff, val); + if (isKeyColumn()) + keyHash = 31 * keyHash + Arrays.hashCode(val); + writeVarlenOffset(curVartblEntry, curOff - dataOff); curVartblEntry++; - curOff += val.length; - shiftColumn(); + shiftColumn(val.length); return this; } @@ -434,9 +408,6 @@ public class RowAssembler { throw new IllegalArgumentException("Failed to set bitmask for column '" + col.name() + "' " + "(mask size exceeds allocated size) [mask=" + bitSet + ", maxSize=" + maskType.bits() + "]"); - if (isKeyColumn()) - hash = 31 * hash + bitSet.hashCode(); - byte[] arr = bitSet.toByteArray(); buf.putBytes(curOff, arr); @@ -444,9 +415,10 @@ public class RowAssembler { for (int i = 0; i < maskType.sizeInBytes() - arr.length; i++) buf.put(curOff + arr.length + i, (byte)0); - curOff += maskType.sizeInBytes(); + if (isKeyColumn()) + keyHash = 31 * keyHash + bitSet.hashCode(); - shiftColumn(); + shiftColumn(maskType.sizeInBytes()); return this; } @@ -459,7 +431,7 @@ public class RowAssembler { throw new AssemblyException("Key column missed: colIdx=" + curCol); else { if (curCol == 0) { - flags &= ~(RowFlags.CHUNK_FLAGS_MASK << VAL_FLAGS_OFFSET); +// flags &= ~(RowFlags.CHUNK_FLAGS_MASK << VAL_FLAGS_OFFSET); flags |= RowFlags.NO_VALUE_FLAG; } else if (schema.valueColumns().length() != curCol) @@ -467,19 +439,12 @@ public class RowAssembler { } buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags); - buf.putInt(KEY_HASH_FIELD_OFFSET, hash); + buf.putInt(BinaryRow.KEY_HASH_FIELD_OFFSET, keyHash); return buf.toArray(); } /** - * @return {@code true} if current column is a key column, {@code false} otherwise. - */ - private boolean isKeyColumn() { - return curCols == schema.keyColumns(); - } - - /** * @return UTF-8 string encoder. */ private CharsetEncoder encoder() { @@ -490,6 +455,19 @@ public class RowAssembler { } /** + * Writes the given offset to the varlen table entry with the given index. + * + * @param entryIdx Vartable entry index. + * @param off Offset to write. + */ + private void writeVarlenOffset(int entryIdx, int off) { + if (entryIdx == 0) + return; // Omit offset for very first varlen. + + buf.putInt(varTblOff + Short.BYTES + (entryIdx - 1) * Integer.BYTES, off); + } + + /** * Checks that the type being appended matches the column type. * * @param type Type spec that is attempted to be appended. @@ -512,23 +490,39 @@ public class RowAssembler { } /** + * Sets null flag in the null-map for the given column. + * + * @param colIdx Column index. + */ + private void setNull(int colIdx) { +// assert nullMapOff < varTblOff : "Null-map is omitted."; + + int byteInMap = colIdx / 8;// >> 3; // Equivalent expression for: colIidx / 8 + int bitInByte = colIdx % 8; //& 7; // Equivalent expression for: colIdx % 8 + + buf.ensureCapacity(nullMapOff + byteInMap + 1); + + buf.put(nullMapOff + byteInMap, (byte)(buf.get(nullMapOff + byteInMap) | (1 << bitInByte))); + } + + /** * Shifts current column indexes as necessary, also * switch to value chunk writer when moving from key to value columns. */ - private void shiftColumn() { + private void shiftColumn(int size) { curCol++; + curOff += size; if (curCol == curCols.length()) { // Write sizes. - final int size = curOff - baseOff; + final int chunkLen = curOff - baseOff; - buf.putInt(baseOff, size); + buf.putInt(baseOff, chunkLen); if (curVartblEntry > 1) { - assert varTblOff < dataOff : - "Illegal writing of varlen when 'omit vartable' flag is set for a chunk."; - - assert varTblOff + varTableLength(curVartblEntry - 1, Integer.BYTES) == dataOff : "Vartable overlow: size=" + curVartblEntry; +// assert varTblOff < dataOff : "Illegal writing of varlen when 'omit vartable' flag is set for a chunk."; +// +// assert varTblOff + varTableLength(curVartblEntry, Integer.BYTES) == dataOff : "Vartable overlow: size=" + curVartblEntry; buf.putShort(varTblOff, (short)(curVartblEntry - 1)); } @@ -538,49 +532,28 @@ public class RowAssembler { // Switch key->value columns. curCols = schema.valueColumns(); - curCol = 0; // Create value chunk writer. - initChunk(BinaryRow.HEADER_SIZE + size/* Key chunk size */, curCols.nullMapSize(), valVarlenCols); + initChunk(BinaryRow.HEADER_SIZE + chunkLen/* Key chunk size */, curCols.nullMapSize(), varVartblLen); } } /** - * @return Null-map offset. - */ - private int nullmapOff() { - return baseOff + BinaryRow.CHUNK_LEN_FLD_SIZE; - } - - /** - * Writes the given offset to the varlen table entry with the given index. - * - * @param entryIdx Vartable entry index. - * @param off Offset to write. - */ - protected void writeVarlenOffset(int entryIdx, int off) { - if (entryIdx == 0) - return; // Omit offset for very first varlen. - - buf.putInt(varTblOff + Short.BYTES + (entryIdx - 1) * Integer.BYTES, off); - } - - /** * Init chunk offsets and flags. * * @param baseOff Chunk base offset. * @param nullMapLen Null-map length in bytes. - * @param vartblEntries Vartable entries. + * @param vartblLen Vartable length in bytes. */ - private void initChunk(int baseOff, int nullMapLen, int vartblEntries) { + private void initChunk(int baseOff, int nullMapLen, int vartblLen) { this.baseOff = baseOff; - final int vartblLen = varTableLength(vartblEntries, Integer.BYTES); - - varTblOff = nullmapOff() + nullMapLen; + nullMapOff = baseOff + BinaryRow.CHUNK_LEN_FLD_SIZE; + varTblOff = nullMapOff + nullMapLen; dataOff = varTblOff + vartblLen; curOff = dataOff; curVartblEntry = 0; + curCol = 0; int flags = 0; @@ -590,6 +563,13 @@ public class RowAssembler { if (vartblLen == 0) flags |= VarTableFormat.OMIT_VARTBL_FLAG; - this.flags |= flags << (baseOff == KEY_CHUNK_OFFSET ? KEY_FLAGS_OFFSET : VAL_FLAGS_OFFSET); + this.flags |= flags << (baseOff == BinaryRow.KEY_CHUNK_OFFSET ? KEY_FLAGS_OFFSET : VAL_FLAGS_OFFSET); + } + + /** + * @return {@code true} if current column is a key column, {@code false} otherwise. + */ + private boolean isKeyColumn() { + return baseOff == BinaryRow.KEY_CHUNK_OFFSET; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java index dc0b59a..dbca237 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java @@ -62,10 +62,7 @@ public class TupleMarshallerImpl implements TupleMarshaller { validate(keyTuple, schema.keyColumns()); - TupleStatistics keyStat = tupleStatistics(schema.keyColumns(), keyTuple); - TupleStatistics valStat = tupleStatistics(schema.valueColumns(), valTuple); - - final RowAssembler rowBuilder = createAssembler(schema, keyStat, valStat); + final RowAssembler rowBuilder = createAssembler(schema, keyTuple, valTuple); for (int i = 0; i < schema.keyColumns().length(); i++) { final Column col = schema.keyColumns().column(i); @@ -87,39 +84,6 @@ public class TupleMarshallerImpl implements TupleMarshaller { } /** - * Analyze given tuple and gather statistics. - * - * @param cols Columns which statistics is calculated for. - * @param tup Tuple to analyze. - * @return Tuple statistics. - */ - private TupleStatistics tupleStatistics(Columns cols, Tuple tup) { - if (tup == null) - return new TupleStatistics(); - - TupleStatistics chunk = new TupleStatistics(); - - chunk.payloadLen = cols.fixsizeMaxLen(); - - if (!cols.hasVarlengthColumns()) - return chunk; - - for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) { - Column col = cols.column(i); - - Object val = (tup.contains(col.name())) ? tup.value(col.name()) : col.defaultValue(); - - if (val == null) - continue; - - chunk.nonNullVarlen++; - chunk.payloadLen += getValueSize(val, col.type()); - } - - return chunk; - } - - /** * Validates columns values. * * @param tuple Tuple to validate. @@ -143,16 +107,19 @@ public class TupleMarshallerImpl implements TupleMarshaller { /** * Creates {@link RowAssembler} for key-value tuples. * - * @param keyStat Key tuple statistics. - * @param valStat Value tuple statistics. + * @param keyTuple Key tuple. + * @param valTuple Value tuple. * @return Row assembler. */ - private RowAssembler createAssembler(SchemaDescriptor schema, TupleStatistics keyStat, TupleStatistics valStat) { + private RowAssembler createAssembler(SchemaDescriptor schema, Tuple keyTuple, Tuple valTuple) { + TupleStatistics keyStat = tupleStatistics(schema.keyColumns(), keyTuple); + TupleStatistics valStat = tupleStatistics(schema.valueColumns(), valTuple); + return new RowAssembler( schema, - keyStat.payloadLen, + keyStat.nonNullVarLenSize, keyStat.nonNullVarlen, - valStat.payloadLen, + valStat.nonNullVarLenSize, valStat.nonNullVarlen); } @@ -162,7 +129,12 @@ public class TupleMarshallerImpl implements TupleMarshaller { * @param tup Tuple. */ private void writeColumn(RowAssembler rowAsm, Column col, Tuple tup) { - Object val = tup.contains(col.name()) ? tup.value(col.name()) : col.defaultValue(); + Object val; + + if (!tup.contains(col.name())) + val = col.defaultValue(); + else + val = tup.value(col.name()); if (val == null) { rowAsm.appendNull(); @@ -227,13 +199,50 @@ public class TupleMarshallerImpl implements TupleMarshaller { } /** + * Analyze given tuple and gather statistics. + * + * @param cols Columns which statistics is calculated for. + * @param tup Tuple to analyze. + * @return Tuple statistics. + */ + private TupleStatistics tupleStatistics(Columns cols, Tuple tup) { + if (tup == null || !cols.hasVarlengthColumns()) + return TupleStatistics.ZERO_VARLEN_STATISTICS; + + int cnt = 0; + int size = 0; + + for (int i = cols.firstVarlengthColumn(); i < cols.length(); i++) { + Column col = cols.column(i); + + final Object val = tup.contains(col.name()) ? tup.value(col.name()) : col.defaultValue(); + + if (val == null) + continue; + + size += getValueSize(val, col.type()); + cnt++; + } + + return new TupleStatistics(cnt, size); + } + + /** * Tuple statistics record. */ private static class TupleStatistics { - /** Payload length in bytes. */ - int payloadLen; + /** Cached zero statistics. */ + static final TupleStatistics ZERO_VARLEN_STATISTICS = new TupleStatistics(0,0); /** Number of non-null varlen columns. */ int nonNullVarlen; + + /** Length of all non-null fields of varlen types. */ + int nonNullVarLenSize; + + public TupleStatistics(int nonNullVarlen, int nonNullVarLenSize) { + this.nonNullVarlen = nonNullVarlen; + this.nonNullVarLenSize = nonNullVarLenSize; + } } }
