This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 131c9b4459 IGNITE-20618 Sql. Degradation of SELECT operations
performance over time (#2728)
131c9b4459 is described below
commit 131c9b4459639e4b7a6ec5985e142fe25b4fccf2
Author: korlov42 <[email protected]>
AuthorDate: Wed Oct 25 10:38:11 2023 +0300
IGNITE-20618 Sql. Degradation of SELECT operations performance over time
(#2728)
---
.../internal/binarytuple/BinaryTupleBuilder.java | 17 ++-
.../ignite/internal/schema/BinaryTuplePrefix.java | 102 +++++++++++++
.../internal/schema/BinaryTuplePrefixTest.java | 77 ++++++++++
.../internal/sql/engine/exec/ExchangeService.java | 4 +-
.../sql/engine/exec/ExchangeServiceImpl.java | 4 +-
.../engine/exec/ExecutableTableRegistryImpl.java | 2 +-
.../internal/sql/engine/exec/RowConverter.java | 124 ----------------
.../internal/sql/engine/exec/RowHandler.java | 16 +-
.../sql/engine/exec/ScannableTableImpl.java | 63 ++++----
.../internal/sql/engine/exec/SqlRowHandler.java | 80 +++++++---
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 8 +-
.../internal/sql/engine/exec/rel/Outbox.java | 16 +-
.../sql/engine/message/QueryBatchMessage.java | 4 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 7 -
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 17 ++-
.../sql/engine/exec/rel/ExecutionTest.java | 28 ----
.../engine/exec/rel/ScannableTableSelfTest.java | 12 +-
.../exec/rel/TableScanNodeExecutionTest.java | 6 +-
.../sql/engine/exec/row/SqlRowHandlerTest.java | 44 +++---
.../sql/engine/framework/ArrayRowHandler.java | 165 +++++++++++++++++++--
20 files changed, 504 insertions(+), 292 deletions(-)
diff --git
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index c651536d6b..ae681d0d6a 100644
---
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -82,11 +82,24 @@ public class BinaryTupleBuilder {
* @param totalValueSize Total estimated length of non-NULL values, -1 if
not known.
*/
public BinaryTupleBuilder(int numElements, int totalValueSize) {
+ this(numElements, totalValueSize, true);
+ }
+
+ /**
+ * Creates a builder.
+ *
+ * @param numElements Number of tuple elements.
+ * @param totalValueSize Total estimated length of non-NULL values, -1 if
not known.
+ * @param exactEstimate Whether the total size is exact estimate or
approximate. The
+ * difference here is with exact estimate allocation will be optimal,
while with
+ * approximate estimate some excess allocation is possible.
+ */
+ public BinaryTupleBuilder(int numElements, int totalValueSize, boolean
exactEstimate) {
this.numElements = numElements;
entryBase = BinaryTupleCommon.HEADER_SIZE;
- if (totalValueSize < 0) {
+ if (totalValueSize < 0 || !exactEstimate) {
entrySize = Integer.BYTES;
} else {
entrySize =
BinaryTupleCommon.flagsToEntrySize(BinaryTupleCommon.valueSizeToFlags(totalValueSize));
@@ -359,7 +372,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendBytes(byte[] value) {
+ public BinaryTupleBuilder appendBytes(byte @Nullable [] value) {
return value == null ? appendNull() : appendBytesNotNull(value);
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
index cecbb140d4..041d46a135 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.schema;
import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.PREFIX_FLAG;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.lang.InternalTuple;
@@ -51,6 +52,25 @@ public class BinaryTuplePrefix extends BinaryTupleReader
implements InternalTupl
super(elementCount, buffer);
}
+ /**
+ * Creates a prefix from provided {@link BinaryTuple}. If given tuple has
lesser or equal number of
+ * columns, then all elements will be used in resulting prefix. If given
tuple has more columns, then
+ * excess columns will be truncated.
+ *
+ * @param numElements Number of elements in full schema of prefix.
+ * @param tuple Tuple to create a prefix from.
+ * @return Prefix, created from provided tuple with regards to desired
number of elements.
+ */
+ public static BinaryTuplePrefix fromBinaryTuple(int numElements,
BinaryTuple tuple) {
+ if (numElements == tuple.elementCount()) {
+ return entireTuple(tuple);
+ } else if (numElements > tuple.elementCount()) {
+ return expandTuple(numElements, tuple);
+ } else {
+ return truncateTuple(numElements, tuple);
+ }
+ }
+
/**
* Creates a prefix that contains all columns from the provided {@link
BinaryTuple}.
*
@@ -58,6 +78,10 @@ public class BinaryTuplePrefix extends BinaryTupleReader
implements InternalTupl
* @return Prefix, equivalent to the tuple.
*/
public static BinaryTuplePrefix fromBinaryTuple(BinaryTuple tuple) {
+ return entireTuple(tuple);
+ }
+
+ private static BinaryTuplePrefix entireTuple(BinaryTuple tuple) {
ByteBuffer tupleBuffer = tuple.byteBuffer();
ByteBuffer prefixBuffer = ByteBuffer.allocate(tupleBuffer.remaining()
+ Integer.BYTES)
@@ -73,6 +97,84 @@ public class BinaryTuplePrefix extends BinaryTupleReader
implements InternalTupl
return new BinaryTuplePrefix(tuple.elementCount(), prefixBuffer);
}
+ private static BinaryTuplePrefix expandTuple(int numElements, BinaryTuple
tuple) {
+ assert numElements > tuple.elementCount();
+
+ if (tuple.elementCount() == 0) {
+ return new BinaryTuplePrefix(
+ numElements, new BinaryTuplePrefixBuilder(0, numElements,
0).build()
+ );
+ }
+
+ int[] dataBeginOffsetHolder = new int[1];
+ int[] dataEndOffsetHolder = new int[1];
+
+ tuple.fetch(0, (index, begin, end) -> dataBeginOffsetHolder[0] =
begin);
+ tuple.fetch(tuple.elementCount() - 1, (index, begin, end) ->
dataEndOffsetHolder[0] = end);
+
+ ByteBuffer tupleBuffer = tuple.byteBuffer();
+
+ byte flags = tupleBuffer.get(0);
+ int entrySize = BinaryTupleCommon.flagsToEntrySize(flags);
+
+ int newTupleSize = tupleBuffer.remaining() // size of original tuple
+ // additional space for the offset map to align it with
desired number of columns
+ + (entrySize * (numElements - tuple.elementCount()))
+ + Integer.BYTES; // actual number of columns in prefix
+
+ ByteBuffer prefixBuffer = ByteBuffer.allocate(newTupleSize)
+ .order(ORDER)
+ .put(tupleBuffer.duplicate().limit(dataBeginOffsetHolder[0]));
// header
+
+ int payloadEndPosition = dataEndOffsetHolder[0] -
dataBeginOffsetHolder[0];
+ for (int idx = tuple.elementCount(); idx < numElements; idx++) {
+ switch (entrySize) {
+ case Byte.BYTES:
+ prefixBuffer.put((byte) payloadEndPosition);
+ break;
+ case Short.BYTES:
+ prefixBuffer.putShort((short) payloadEndPosition);
+ break;
+ case Integer.BYTES:
+ prefixBuffer.putInt(payloadEndPosition);
+ break;
+ default:
+ assert false;
+ }
+ }
+
+ prefixBuffer
+
.put(tupleBuffer.slice().position(dataBeginOffsetHolder[0]).limit(dataEndOffsetHolder[0]))
// payload
+ .putInt(tuple.elementCount())
+ .flip();
+
+ prefixBuffer.put(0, (byte) (flags | PREFIX_FLAG));
+
+ return new BinaryTuplePrefix(numElements, prefixBuffer);
+ }
+
+ private static BinaryTuplePrefix truncateTuple(int numElements,
BinaryTuple tuple) {
+ assert numElements < tuple.elementCount();
+
+ int[] dataBeginOffsetHolder = new int[1];
+ int[] dataEndOffsetHolder = new int[1];
+
+ tuple.fetch(0, (index, begin, end) -> dataBeginOffsetHolder[0] =
begin);
+ tuple.fetch(numElements - 1, (index, begin, end) ->
dataEndOffsetHolder[0] = end);
+
+ BinaryTuplePrefixBuilder builder = new BinaryTuplePrefixBuilder(
+ numElements, numElements, dataEndOffsetHolder[0] -
dataBeginOffsetHolder[0]
+ );
+
+ for (int i = 0; i < numElements; i++) {
+ byte[] valueBytes = tuple.bytesValue(i);
+
+ builder.appendBytes(valueBytes);
+ }
+
+ return new BinaryTuplePrefix(numElements, builder.build());
+ }
+
@Override
public int elementCount() {
ByteBuffer buffer = byteBuffer();
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
index 99acfc8770..3c9815e140 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.schema;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -25,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
import java.time.LocalDate;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.junit.jupiter.api.Test;
@@ -76,6 +78,81 @@ public class BinaryTuplePrefixTest {
assertThat(prefix.intValue(0), is(Integer.MAX_VALUE));
}
+ @Test
+ public void testCreatePrefixFromBinaryTupleWhichSizeIsLessThanRequired() {
+ int sourceTupleSize = 1;
+
+ ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+ .appendInt(10)
+ .build();
+
+ BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(4, new
BinaryTuple(sourceTupleSize, buffer));
+
+ assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+ assertThat(prefix.intValue(0), equalTo(10));
+ assertThat(prefix.hasNullValue(1), equalTo(true));
+ assertThat(prefix.hasNullValue(2), equalTo(true));
+ assertThat(prefix.hasNullValue(3), equalTo(true));
+ }
+
+ @Test
+ public void testCreatePrefixFromBinaryTupleWhichSizeIsEqualToRequired() {
+ int sourceTupleSize = 4;
+
+ ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+ .appendInt(10)
+ .appendString("foo")
+ .appendNull()
+ .appendBoolean(false)
+ .build();
+
+ BinaryTuplePrefix prefix =
BinaryTuplePrefix.fromBinaryTuple(sourceTupleSize, new
BinaryTuple(sourceTupleSize, buffer));
+
+ assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+ assertThat(prefix.intValue(0), equalTo(10));
+ assertThat(prefix.stringValue(1), equalTo("foo"));
+ assertThat(prefix.hasNullValue(2), equalTo(true));
+ assertThat(prefix.booleanValue(3), equalTo(false));
+ }
+
+ @Test
+ public void
testCreatePrefixFromBinaryTupleWhichSizeIsGreaterThanRequired() {
+ int sourceTupleSize = 5;
+
+ ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+ .appendInt(10)
+ .appendString("foo")
+ .appendNull()
+ .appendBoolean(false)
+ .appendString("truncated value")
+ .build();
+
+ int prefixSize = 4;
+
+ BinaryTuplePrefix prefix =
BinaryTuplePrefix.fromBinaryTuple(prefixSize, new BinaryTuple(sourceTupleSize,
buffer));
+
+ assertThat(prefix.elementCount(), equalTo(prefixSize));
+ assertThat(prefix.intValue(0), equalTo(10));
+ assertThat(prefix.stringValue(1), equalTo("foo"));
+ assertThat(prefix.hasNullValue(2), equalTo(true));
+ assertThat(prefix.booleanValue(3), equalTo(false));
+ }
+
+ @Test
+ public void testCreatePrefixFromZeroLengthBinaryTuple() {
+ int sourceTupleSize = 0;
+
+ ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize).build();
+
+ BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(4, new
BinaryTuple(sourceTupleSize, buffer));
+
+ assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+ assertThat(prefix.hasNullValue(0), equalTo(true));
+ assertThat(prefix.hasNullValue(1), equalTo(true));
+ assertThat(prefix.hasNullValue(2), equalTo(true));
+ assertThat(prefix.hasNullValue(3), equalTo(true));
+ }
+
/**
* Tests construction of an invalid prefix.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index 4211794044..db7ef9849f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -48,7 +48,7 @@ public interface ExchangeService extends LifecycleAware {
* which completes when the data has been sent.
*/
CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long
fragmentId, long exchangeId, int batchId, boolean last,
- List<ByteBuffer> rows);
+ List<BinaryTupleMessage> rows);
/**
* Asynchronously requests data from the specified node.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index a9f390ef94..859670c7b9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -36,6 +35,7 @@ import
org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.TraceableException;
import org.jetbrains.annotations.Nullable;
@@ -76,7 +76,7 @@ public class ExchangeServiceImpl implements ExchangeService {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long
fragmentId, long exchangeId, int batchId,
- boolean last, List<ByteBuffer> rows) {
+ boolean last, List<BinaryTupleMessage> rows) {
return messageService.send(
nodeName,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 8120ef2ea2..c08a184545 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -84,7 +84,7 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
);
InternalTable internalTable = table.internalTable();
- ScannableTable scannableTable = new
ScannableTableImpl(internalTable, converterFactory, tableDescriptor);
+ ScannableTable scannableTable = new
ScannableTableImpl(internalTable, converterFactory);
UpdatableTableImpl updatableTable = new
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
replicaService, clock,
converterFactory.create(null), schemaDescriptor);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
deleted file mode 100644
index 60fd46c3f6..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.exec;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.type.NativeTypeSpec;
-
-/**
- * Helper class provides method to convert binary tuple to rows and vice-versa.
- */
-public final class RowConverter {
- /**
- * Creates binary tuple schema for index rows.
- */
- public static BinaryTupleSchema createIndexRowSchema(List<String>
indexedColumns, TableDescriptor tableDescriptor) {
- Element[] elements = indexedColumns.stream()
- .map(tableDescriptor::columnDescriptor)
- .map(colDesc -> new Element(colDesc.physicalType(), true))
- .toArray(Element[]::new);
-
- return BinaryTupleSchema.create(elements);
- }
-
- /**
- * Converts a search row, which represents prefix condition, to a binary
tuple.
- *
- * @param ectx Execution context.
- * @param binarySchema Binary tuple schema.
- * @param factory Row handler factory.
- * @param searchRow Search row.
- * @param <RowT> Row type.
- * @return Binary tuple.
- */
- static <RowT> BinaryTuplePrefix toBinaryTuplePrefix(
- ExecutionContext<RowT> ectx,
- BinaryTupleSchema binarySchema,
- RowHandler.RowFactory<RowT> factory,
- RowT searchRow
- ) {
- RowHandler<RowT> handler = factory.handler();
-
- int indexedColumnsCount = binarySchema.elementCount();
- int prefixColumnsCount = handler.columnCount(searchRow);
-
- assert binarySchema.elementCount() >= prefixColumnsCount : "Invalid
range condition";
-
- BinaryTuplePrefixBuilder tupleBuilder = new
BinaryTuplePrefixBuilder(prefixColumnsCount, indexedColumnsCount);
-
- return new BinaryTuplePrefix(indexedColumnsCount,
toByteBuffer(binarySchema, handler, tupleBuilder, searchRow));
- }
-
- /**
- * Converts a search row, which represents exact value condition, to a
binary tuple.
- *
- * @param ectx Execution context.
- * @param binarySchema Binary tuple schema.
- * @param factory Row handler factory.
- * @param searchRow Search row.
- * @param <RowT> Row type.
- * @return Binary tuple.
- */
- public static <RowT> BinaryTuple toBinaryTuple(
- ExecutionContext<RowT> ectx,
- BinaryTupleSchema binarySchema,
- RowHandler.RowFactory<RowT> factory,
- RowT searchRow
- ) {
- RowHandler<RowT> handler = factory.handler();
-
- int rowColumnsCount = handler.columnCount(searchRow);
-
- assert rowColumnsCount == binarySchema.elementCount() : "Invalid
lookup key.";
-
- BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(rowColumnsCount);
-
- return new BinaryTuple(rowColumnsCount, toByteBuffer(binarySchema,
handler, tupleBuilder, searchRow));
- }
-
- private static <RowT> ByteBuffer toByteBuffer(
- BinaryTupleSchema binarySchema,
- RowHandler<RowT> handler,
- BinaryTupleBuilder tupleBuilder,
- RowT searchRow
- ) {
- int columnsCount = handler.columnCount(searchRow);
-
- for (int i = 0; i < columnsCount; i++) {
- Object val = handler.get(i, searchRow);
-
- Element element = binarySchema.element(i);
-
- val = TypeUtils.fromInternal(val,
NativeTypeSpec.toClass(element.typeSpec(), element.nullable()));
-
- BinaryRowConverter.appendValue(tupleBuilder, element, val);
- }
-
- return tupleBuilder.build();
- }
-}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index 302be058c9..8b35056635 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.jetbrains.annotations.Nullable;
@@ -64,12 +64,12 @@ public interface RowHandler<RowT> {
int columnCount(RowT row);
/**
- * Assembly row representation as ByteBuffer.
+ * Assembly row representation as BinaryTuple.
*
* @param row Incoming data to be processed.
- * @return {@link ByteBuffer} representation.
+ * @return {@link BinaryTuple} representation.
*/
- ByteBuffer toByteBuffer(RowT row);
+ BinaryTuple toBinaryTuple(RowT row);
/** String representation. */
String toString(RowT row);
@@ -96,14 +96,6 @@ public interface RowHandler<RowT> {
*/
RowT create(Object... fields);
- /**
- * Create row using incoming {@link ByteBuffer}.
- *
- * @param raw {@link ByteBuffer} representation.
- * @return Instantiation defined representation.
- */
- RowT create(ByteBuffer raw);
-
/**
* Create row using incoming binary tuple.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index cfe637fa27..e0477f4515 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
@@ -27,10 +28,8 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -45,13 +44,10 @@ public class ScannableTableImpl implements ScannableTable {
private final TableRowConverterFactory converterFactory;
- private final TableDescriptor tableDescriptor;
-
/** Constructor. */
- public ScannableTableImpl(InternalTable internalTable,
TableRowConverterFactory converterFactory, TableDescriptor tableDescriptor) {
+ public ScannableTableImpl(InternalTable internalTable,
TableRowConverterFactory converterFactory) {
this.internalTable = internalTable;
this.converterFactory = converterFactory;
- this.tableDescriptor = tableDescriptor;
}
/** {@inheritDoc} */
@@ -90,9 +86,8 @@ public class ScannableTableImpl implements ScannableTable {
@Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns
) {
-
- BinaryTupleSchema indexRowSchema =
RowConverter.createIndexRowSchema(columns, tableDescriptor);
TxAttributes txAttributes = ctx.txAttributes();
+ RowHandler<RowT> handler = rowFactory.handler();
Publisher<BinaryRow> pub;
BinaryTuplePrefix lower;
@@ -105,17 +100,21 @@ public class ScannableTableImpl implements ScannableTable
{
lower = null;
upper = null;
} else {
- lower = toBinaryTuplePrefix(ctx, indexRowSchema, cond.lower(),
rowFactory);
- upper = toBinaryTuplePrefix(ctx, indexRowSchema, cond.upper(),
rowFactory);
+ lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower());
+ upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper());
flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : 0;
flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : 0;
}
if (txAttributes.readOnly()) {
+ HybridTimestamp readTime = txAttributes.time();
+
+ assert readTime != null;
+
pub = internalTable.scan(
partWithTerm.partId(),
- txAttributes.time(),
+ readTime,
ctx.localNode(),
indexId,
lower,
@@ -148,24 +147,31 @@ public class ScannableTableImpl implements ScannableTable
{
PartitionWithTerm partWithTerm,
RowFactory<RowT> rowFactory,
int indexId,
- List<String> columns, RowT key,
+ List<String> columns,
+ RowT key,
@Nullable BitSet requiredColumns
) {
-
- BinaryTupleSchema indexRowSchema =
RowConverter.createIndexRowSchema(columns, tableDescriptor);
TxAttributes txAttributes = ctx.txAttributes();
+ RowHandler<RowT> handler = rowFactory.handler();
Publisher<BinaryRow> pub;
- BinaryTuple keyTuple = toBinaryTuple(ctx, indexRowSchema, key,
rowFactory);
+ BinaryTuple keyTuple = handler.toBinaryTuple(key);
+
+ assert keyTuple.elementCount() == columns.size()
+ : format("Key should contain exactly {} fields, but was {}",
columns.size(), handler.toString(key));
if (txAttributes.readOnly()) {
+ HybridTimestamp readTime = txAttributes.time();
+
+ assert readTime != null;
+
pub = internalTable.lookup(
partWithTerm.partId(),
- txAttributes.time(),
+ readTime,
ctx.localNode(),
indexId,
keyTuple,
- requiredColumns
+ null
);
} else {
pub = internalTable.lookup(
@@ -174,7 +180,7 @@ public class ScannableTableImpl implements ScannableTable {
new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
indexId,
keyTuple,
- requiredColumns
+ null
);
}
@@ -184,25 +190,16 @@ public class ScannableTableImpl implements ScannableTable
{
}
private static <RowT> @Nullable BinaryTuplePrefix toBinaryTuplePrefix(
- ExecutionContext<RowT> ctx,
- BinaryTupleSchema indexRowSchema,
- @Nullable RowT condition,
- RowFactory<RowT> factory
+ int searchBoundSize,
+ RowHandler<RowT> handler,
+ @Nullable RowT prefix
) {
- if (condition == null) {
+ if (prefix == null || handler.columnCount(prefix) == 0) {
return null;
}
- return RowConverter.toBinaryTuplePrefix(ctx, indexRowSchema, factory,
condition);
- }
+ assert searchBoundSize >= handler.columnCount(prefix) : "Invalid range
condition";
- private static <RowT> @Nullable BinaryTuple
toBinaryTuple(ExecutionContext<RowT> ctx, BinaryTupleSchema indexRowSchema,
- @Nullable RowT condition, RowFactory<RowT> factory) {
- if (condition == null) {
- return null;
- }
-
- return RowConverter.toBinaryTuple(ctx, indexRowSchema, factory,
condition);
+ return BinaryTuplePrefix.fromBinaryTuple(searchBoundSize,
handler.toBinaryTuple(prefix));
}
-
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
index 4551d3c232..5ac300e8c1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -29,6 +28,7 @@ import java.time.LocalTime;
import java.util.BitSet;
import java.util.List;
import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.lang.InternalTuple;
@@ -55,12 +55,11 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*
* <p>Each kind of rows is serialized to the same binary tuple format
- * using the {@link #toByteBuffer(RowWrapper) toByteBuffer} method.
+ * using the {@link #toBinaryTuple(RowWrapper)} method.
*
- * <p>Factory methods {@link RowFactory#create(InternalTuple)
wrap(InternalTuple)} and
- * {@link RowFactory#create(ByteBuffer) create(ByteBuffer)} allow create rows
without
- * any additional conversions. But the fields in binary tuple must match the
- * factory {@link RowSchema row schema}.
+ * <p>Factory method {@link RowFactory#create(InternalTuple)} allows to
+ * create rows without any additional conversions. But the fields in
+ * binary tuple must match the factory {@link RowSchema row schema}.
*/
public class SqlRowHandler implements RowHandler<RowWrapper> {
public static final RowHandler<RowWrapper> INSTANCE = new SqlRowHandler();
@@ -138,8 +137,8 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
}
@Override
- public ByteBuffer toByteBuffer(RowWrapper row) {
- return row.toByteBuffer();
+ public BinaryTuple toBinaryTuple(RowWrapper row) {
+ return row.toBinaryTuple();
}
@Override
@@ -167,12 +166,6 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
return new ObjectsArrayRowWrapper(rowSchema, fields);
}
- /** {@inheritDoc} */
- @Override
- public RowWrapper create(ByteBuffer buf) {
- return create(new BinaryTuple(schemaLen, buf));
- }
-
/** {@inheritDoc} */
@Override
public RowWrapper create(InternalTuple tuple) {
@@ -193,9 +186,9 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
abstract @Nullable Object get(int field);
- abstract void set(int field, Object value);
+ abstract void set(int field, @Nullable Object value);
- abstract ByteBuffer toByteBuffer();
+ abstract BinaryTuple toBinaryTuple();
}
/**
@@ -226,8 +219,45 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
}
@Override
- ByteBuffer toByteBuffer() {
- BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(row.length);
+ BinaryTuple toBinaryTuple() {
+ int estimatedSize = 0;
+ boolean exactEstimate = true;
+ for (int i = 0; i < row.length; i++) {
+ NativeType nativeType =
RowSchemaTypes.toNativeType(rowSchema.fields().get(i));
+
+ if (nativeType == null) {
+ assert row[i] == null;
+
+ continue;
+ }
+
+ Object value = row[i];
+
+ if (value == null) {
+ continue;
+ }
+
+ if (nativeType.spec().fixedLength()) {
+ estimatedSize += nativeType.sizeInBytes();
+ } else {
+ if (value instanceof String) {
+ // every character in the string may contain up to 4
bytes.
+ // Let's be optimistic here and reserve buffer only
for the smallest
+ // possible variant
+
+ estimatedSize += ((String) value).length();
+ exactEstimate = false;
+ } else if (value instanceof ByteString) {
+ estimatedSize += ((ByteString) value).length();
+ } else {
+ assert (value instanceof BigDecimal) || (value
instanceof BigInteger) : "unexpected value " + value.getClass();
+
+ exactEstimate = false;
+ }
+ }
+ }
+
+ BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(row.length, estimatedSize, exactEstimate);
for (int i = 0; i < row.length; i++) {
Object value = row[i];
@@ -235,7 +265,7 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
appendValue(tupleBuilder, rowSchema.fields().get(i), value);
}
- return tupleBuilder.build();
+ return new BinaryTuple(row.length, tupleBuilder.build());
}
@Override
@@ -243,7 +273,7 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
return rowSchema;
}
- private void appendValue(BinaryTupleBuilder builder, TypeSpec
schemaType, @Nullable Object value) {
+ private static void appendValue(BinaryTupleBuilder builder, TypeSpec
schemaType, @Nullable Object value) {
if (value == null) {
builder.appendNull();
@@ -374,8 +404,12 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
}
@Override
- ByteBuffer toByteBuffer() {
- return tuple.byteBuffer();
+ BinaryTuple toBinaryTuple() {
+ if (tuple instanceof BinaryTuple) {
+ return (BinaryTuple) tuple;
+ }
+
+ return new BinaryTuple(tuple.elementCount(), tuple.byteBuffer());
}
@Override
@@ -383,7 +417,7 @@ public class SqlRowHandler implements
RowHandler<RowWrapper> {
return rowSchema;
}
- private @Nullable Object readValue(InternalTuple tuple, NativeType
nativeType, int fieldIndex) {
+ private static @Nullable Object readValue(InternalTuple tuple,
NativeType nativeType, int fieldIndex) {
switch (nativeType.spec()) {
case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
case INT8: return tuple.byteValueBoxed(fieldIndex);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index ba70a02e78..c3fcb8928c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.calcite.util.Util.unexpected;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -39,6 +38,7 @@ import
org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.jetbrains.annotations.Nullable;
@@ -163,15 +163,15 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
* @param last Last batch flag.
* @param rows Rows.
*/
- public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<ByteBuffer> rows) throws Exception {
+ public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<BinaryTupleMessage> rows) throws Exception {
RemoteSource<RowT> source = perNodeBuffers.get(srcNodeName);
boolean waitingBefore = source.check() == State.WAITING;
List<RowT> rows0 = new ArrayList<>(rows.size());
- for (ByteBuffer row : rows) {
- rows0.add(rowFactory.create(row));
+ for (BinaryTupleMessage row : rows) {
+ rows0.add(rowFactory.create(row.asBinaryTuple()));
}
source.onBatchReceived(batchId, last, rows0);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index fb6560c60f..0f6e387ab1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -32,6 +31,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.jetbrains.annotations.Nullable;
@@ -48,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>,
SingleNode<RowT>, Downstream<RowT> {
private static final IgniteLogger LOG = Loggers.forClass(Outbox.class);
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
private final long exchangeId;
private final long targetFragmentId;
@@ -236,10 +239,17 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
private void sendBatch(String nodeName, int batchId, boolean last,
List<RowT> rows) {
RowHandler<RowT> handler = context().rowHandler();
- List<ByteBuffer> rows0 = new ArrayList<>(rows.size());
+ List<BinaryTupleMessage> rows0 = new ArrayList<>(rows.size());
for (RowT row : rows) {
- rows0.add(handler.toByteBuffer(row));
+ BinaryTuple tuple = handler.toBinaryTuple(row);
+
+ rows0.add(
+ TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+ .elementCount(tuple.elementCount())
+ .tuple(tuple.byteBuffer())
+ .build()
+ );
}
exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId,
batchId, last, rows0)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
index b1cc051270..7174aee040 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.sql.engine.message;
-import java.nio.ByteBuffer;
import java.util.List;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -45,5 +45,5 @@ public interface QueryBatchMessage extends
ExecutionContextAwareMessage {
/**
* Get rows.
*/
- List<ByteBuffer> rows();
+ List<BinaryTupleMessage> rows();
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 49947228b1..fb3a07f4b2 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
-import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
@@ -56,7 +55,6 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.NetworkAddress;
@@ -355,11 +353,6 @@ public abstract class AbstractExecutionTest<T> extends
IgniteAbstractTest {
return fields;
}
- @Override
- public Object[] create(ByteBuffer raw) {
- return ByteUtils.fromBytes(raw.array());
- }
-
@Override
public Object[] create(InternalTuple tuple) {
throw new UnsupportedOperationException();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 5a1569f007..c2100b174b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -49,6 +49,7 @@ import
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
@@ -62,6 +63,7 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import
org.apache.ignite.internal.testframework.IgniteTestUtils.PredicateMatcher;
import
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
import
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
@@ -93,9 +95,18 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
private static final Map<String, QueryTaskExecutor> executors = new
HashMap<>();
- public static final CustomMatcher<Object[]> ODD_KEY_MATCHER = new
PredicateMatcher<>(e -> ((int) (e[0])) % 2 != 0, "odd key");
+ private static final CustomMatcher<Object[]> ODD_KEY_MATCHER = new
PredicateMatcher<>(e -> ((int) (e[0])) % 2 != 0, "odd key");
- public static final CustomMatcher<Object[]> EVEN_KEY_MATCHER = new
PredicateMatcher<>(e -> ((int) (e[0])) % 2 == 0, "even key");
+ private static final CustomMatcher<Object[]> EVEN_KEY_MATCHER = new
PredicateMatcher<>(e -> ((int) (e[0])) % 2 == 0, "even key");
+
+ /**
+ * Schema of the rows used in the tests. All data providers created within
this test class must
+ * conform to this row schema.
+ */
+ private static final RowSchema ROW_SCHEMA = RowSchema.builder()
+ .addField(NativeTypes.INT32)
+ .addField(NativeTypes.INT32)
+ .build();
private final Map<String, MailboxRegistry> mailboxes = new HashMap<>();
private final Map<String, ExchangeService> exchangeServices = new
HashMap<>();
@@ -496,7 +507,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
createExchangeService(taskExecutor,
serviceFactory.forNode(localNode.name()), mailboxRegistry));
Inbox<Object[]> inbox = new Inbox<>(
- targetCtx, exchangeService, mailboxRegistry, sourceNodeNames,
comparator, rowFactory(),
+ targetCtx, exchangeService, mailboxRegistry, sourceNodeNames,
comparator, rowHandler().factory(ROW_SCHEMA),
SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 3efdb7697b..0681bed511 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -26,26 +26,22 @@ import static org.apache.calcite.rel.core.JoinRelType.LEFT;
import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
import static org.apache.calcite.rel.core.JoinRelType.SEMI;
import static
org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
-import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -127,30 +123,6 @@ public class ExecutionTest extends
AbstractExecutionTest<Object[]> {
assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
}
- @Test
- public void testRowFactoryAssembly() {
- ExecutionContext<Object[]> ctx = executionContext(false);
-
- RelDataType rowType = TypeUtils.createRowType(ctx.getTypeFactory(),
- TypeUtils.native2relationalTypes(ctx.getTypeFactory(),
NativeTypes.INT32, NativeTypes.STRING, NativeTypes.BOOLEAN));
-
- RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
-
- RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
-
- Object[] row1 = rowFactory.create();
-
- ctx.rowHandler().set(0, row1, 1);
- ctx.rowHandler().set(1, row1, "2");
- ctx.rowHandler().set(2, row1, false);
-
- ByteBuffer bb = ctx.rowHandler().toByteBuffer(row1);
-
- Object[] row2 = rowFactory.create(bb);
-
- assertArrayEquals(row1, row2);
- }
-
@Test
public void testUnionAll() {
ExecutionContext<Object[]> ctx = executionContext(true);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 201298c0c5..83f9a928cf 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -69,9 +69,6 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
-import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.table.InternalTable;
@@ -491,7 +488,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
eq(tx.clusterNode()),
eq(indexId),
any(BinaryTuple.class),
- eq(tester.requiredFields)
+ eq(null)
);
} else {
PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
@@ -502,7 +499,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
eq(primaryReplica),
eq(indexId),
any(BinaryTuple.class),
- eq(tester.requiredFields)
+ eq(null)
);
}
@@ -549,8 +546,6 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
private class Tester {
- final TableDescriptor tableDescriptor;
-
final ScannableTable scannableTable;
final TestInput input;
@@ -562,8 +557,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester(TestInput input) {
this.input = input;
rowConverter = new RowCollectingTableRwoConverter(input);
- tableDescriptor = new
TestTableDescriptor(IgniteDistributions::single, input.rowType);
- scannableTable = new ScannableTableImpl(internalTable, rf ->
rowConverter, tableDescriptor);
+ scannableTable = new ScannableTableImpl(internalTable, rf ->
rowConverter);
}
ResultCollector tableScan(int partitionId, long term, NoOpTransaction
tx) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 964c23726b..cd470cae7f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -52,9 +52,6 @@ import
org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
import org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -136,8 +133,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
return (RowT) TestInternalTableImpl.ROW;
}
};
- TableDescriptor descriptor = new
TestTableDescriptor(IgniteDistributions::single, rowType);
- ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter, descriptor);
+ ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter);
TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scanableTable,
partsWithTerms, null, null, null);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
index a876b91a74..ecf4fecbbf 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.sql.engine.exec.row;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.generateValueByType;
import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType2NativeType;
+import static org.apache.ignite.internal.sql.engine.util.TypeUtils.toInternal;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -73,25 +73,19 @@ public class SqlRowHandlerTest extends IgniteAbstractTest {
int elementsCount = schema.fields().size();
RowFactory<RowWrapper> factory = handler.factory(schema);
- RowWrapper src = factory.create(sourceData);
+ RowWrapper src = factory.create(wrap(sourceData));
// Serialization to binary tuple representation.
- ByteBuffer buf = handler.toByteBuffer(src);
-
- BinaryTuple tuple = new BinaryTuple(elementsCount, buf);
- RowWrapper destWrap = factory.create(tuple);
- RowWrapper dest = factory.create(buf);
+ BinaryTuple tuple = handler.toBinaryTuple(src);
+ RowWrapper dest = factory.create(tuple);
for (int i = 0; i < elementsCount; i++) {
String msg = schema.fields().get(i).toString();
- assertThat(msg, handler.get(i, src), equalTo(sourceData[i]));
-
- // Binary tuple wrapper must return data in internal format.
- Object expected = TypeUtils.toInternal(sourceData[i]);
+ Object expected = toInternal(sourceData[i]);
+ assertThat(msg, handler.get(i, src), equalTo(expected));
assertThat(msg, handler.get(i, dest), equalTo(expected));
- assertThat(msg, handler.get(i, destWrap), equalTo(expected));
}
}
@@ -116,10 +110,10 @@ public class SqlRowHandlerTest extends IgniteAbstractTest
{
RowSchema concatenatedSchema = builder.build();
// Serialize.
- ByteBuffer buf = handler.toByteBuffer(concatenated);
+ BinaryTuple tuple = handler.toBinaryTuple(concatenated);
// Wrap into row.
- RowWrapper result = handler.factory(concatenatedSchema).create(new
BinaryTuple(totalElementsCount, buf));
+ RowWrapper result = handler.factory(concatenatedSchema).create(tuple);
for (int i = 0; i < leftLen; i++) {
assertThat(handler.get(i, result),
equalTo(TypeUtils.toInternal(params.leftData[i])));
@@ -149,13 +143,13 @@ public class SqlRowHandlerTest extends IgniteAbstractTest
{
RowFactory<RowWrapper> factory = handler.factory(schema);
RowWrapper srcRow = factory.create(sourceData);
- RowWrapper srcBinRow = factory.create(new
BinaryTuple(handler.columnCount(srcRow), handler.toByteBuffer(srcRow)));
+ RowWrapper srcBinRow = factory.create(handler.toBinaryTuple(srcRow));
RowWrapper mappedRow = handler.map(srcRow, mapping);
RowWrapper mappedFromBinRow = handler.map(srcBinRow, mapping);
RowSchema mappedSchema = rowSchema(columnTypes.subList(0,
mapping.length), Arrays.copyOf(sourceData, mapping.length));
- RowWrapper deserializedMappedBinRow =
handler.factory(mappedSchema).create(handler.toByteBuffer(mappedFromBinRow));
+ RowWrapper deserializedMappedBinRow =
handler.factory(mappedSchema).create(handler.toBinaryTuple(mappedFromBinRow));
assertThat(handler.columnCount(mappedRow), equalTo(mapping.length));
assertThat(handler.columnCount(mappedFromBinRow),
equalTo(mapping.length));
@@ -213,6 +207,16 @@ public class SqlRowHandlerTest extends IgniteAbstractTest {
return values;
}
+ private static Object[] wrap(Object[] values) {
+ Object[] newValues = new Object[values.length];
+
+ for (int i = 0; i < values.length; i++) {
+ newValues[i] = toInternal(values[i]);
+ }
+
+ return newValues;
+ }
+
private List<ColumnType> columnTypes() {
List<ColumnType> columnTypes = new ArrayList<>(
// TODO Include ignored types to test after
https://issues.apache.org/jira/browse/IGNITE-15200
@@ -244,11 +248,11 @@ public class SqlRowHandlerTest extends IgniteAbstractTest
{
RowFactory<RowWrapper> factory1 = handler.factory(leftSchema);
RowFactory<RowWrapper> factory2 = handler.factory(rightSchema);
- RowWrapper left = factory1.create(leftData);
- RowWrapper right = factory2.create(rightData);
+ RowWrapper left = factory1.create(wrap(leftData));
+ RowWrapper right = factory2.create(wrap(rightData));
- this.left = leftTupleRequired ?
factory1.create(handler.toByteBuffer(left)) : left;
- this.right = rightTupleRequired ?
factory2.create(handler.toByteBuffer(right)) : right;
+ this.left = leftTupleRequired ?
factory1.create(handler.toBinaryTuple(left)) : left;
+ this.right = rightTupleRequired ?
factory2.create(handler.toBinaryTuple(right)) : right;
}
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
index 21c9f69e9f..c80817c63b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
@@ -17,13 +17,29 @@
package org.apache.ignite.internal.sql.engine.framework;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Arrays;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchemaTypes;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.DecimalNativeType;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.jetbrains.annotations.Nullable;
/**
* Handler for rows that implemented as a simple objects array.
@@ -71,9 +87,14 @@ public class ArrayRowHandler implements RowHandler<Object[]>
{
}
@Override
- public ByteBuffer toByteBuffer(Object[] row) {
- byte[] raw = ByteUtils.toBytes(row);
- return ByteBuffer.wrap(raw);
+ public BinaryTuple toBinaryTuple(Object[] row) {
+ BinaryTupleBuilder tupleBuilder = new BinaryTupleBuilder(row.length);
+
+ for (Object value : row) {
+ appendValue(tupleBuilder, value);
+ }
+
+ return new BinaryTuple(row.length, tupleBuilder.build());
}
/** {@inheritDoc} */
@@ -108,17 +129,137 @@ public class ArrayRowHandler implements
RowHandler<Object[]> {
return fields;
}
- /** {@inheritDoc} */
- @Override
- public Object[] create(ByteBuffer raw) {
- return ByteUtils.fromBytes(raw.array());
- }
-
/** {@inheritDoc} */
@Override
public Object[] create(InternalTuple tuple) {
- throw new UnsupportedOperationException();
+ Object[] row = new Object[tuple.elementCount()];
+
+ for (int i = 0; i < row.length; i++) {
+ NativeType nativeType =
RowSchemaTypes.toNativeType(rowSchema.fields().get(i));
+
+ if (nativeType == null) {
+ row[i] = null;
+
+ continue;
+ }
+
+ row[i] = readValue(tuple, nativeType, i);
+ }
+
+ return row;
}
};
}
+
+ private static void appendValue(BinaryTupleBuilder builder, @Nullable
Object value) {
+ if (value == null) {
+ builder.appendNull();
+
+ return;
+ }
+
+ NativeType nativeType = NativeTypes.fromObject(value);
+
+ assert nativeType != null;
+
+ value = TypeUtils.fromInternal(value,
NativeTypeSpec.toClass(nativeType.spec(), true));
+
+ assert value != null : nativeType;
+
+ switch (nativeType.spec()) {
+ case BOOLEAN:
+ builder.appendBoolean((boolean) value);
+ break;
+
+ case INT8:
+ builder.appendByte((byte) value);
+ break;
+
+ case INT16:
+ builder.appendShort((short) value);
+ break;
+
+ case INT32:
+ builder.appendInt((int) value);
+ break;
+
+ case INT64:
+ builder.appendLong((long) value);
+ break;
+
+ case FLOAT:
+ builder.appendFloat((float) value);
+ break;
+
+ case DOUBLE:
+ builder.appendDouble((double) value);
+ break;
+
+ case NUMBER:
+ builder.appendNumberNotNull((BigInteger) value);
+ break;
+
+ case DECIMAL:
+ builder.appendDecimalNotNull((BigDecimal) value,
((DecimalNativeType) nativeType).scale());
+ break;
+
+ case UUID:
+ builder.appendUuidNotNull((UUID) value);
+ break;
+
+ case BYTES:
+ builder.appendBytesNotNull((byte[]) value);
+ break;
+
+ case STRING:
+ builder.appendStringNotNull((String) value);
+ break;
+
+ case BITMASK:
+ builder.appendBitmaskNotNull((BitSet) value);
+ break;
+
+ case DATE:
+ builder.appendDateNotNull((LocalDate) value);
+ break;
+
+ case TIME:
+ builder.appendTimeNotNull((LocalTime) value);
+ break;
+
+ case DATETIME:
+ builder.appendDateTimeNotNull((LocalDateTime) value);
+ break;
+
+ case TIMESTAMP:
+ builder.appendTimestampNotNull((Instant) value);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unknown type " +
nativeType);
+ }
+ }
+
+ private static @Nullable Object readValue(InternalTuple tuple, NativeType
nativeType, int fieldIndex) {
+ switch (nativeType.spec()) {
+ case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
+ case INT8: return tuple.byteValueBoxed(fieldIndex);
+ case INT16: return tuple.shortValueBoxed(fieldIndex);
+ case INT32: return tuple.intValueBoxed(fieldIndex);
+ case INT64: return tuple.longValueBoxed(fieldIndex);
+ case FLOAT: return tuple.floatValueBoxed(fieldIndex);
+ case DOUBLE: return tuple.doubleValueBoxed(fieldIndex);
+ case DECIMAL: return tuple.decimalValue(fieldIndex,
((DecimalNativeType) nativeType).scale());
+ case UUID: return tuple.uuidValue(fieldIndex);
+ case STRING: return tuple.stringValue(fieldIndex);
+ case BYTES: return tuple.bytesValue(fieldIndex);
+ case BITMASK: return tuple.bitmaskValue(fieldIndex);
+ case NUMBER: return tuple.numberValue(fieldIndex);
+ case DATE: return tuple.dateValue(fieldIndex);
+ case TIME: return tuple.timeValue(fieldIndex);
+ case DATETIME: return tuple.dateTimeValue(fieldIndex);
+ case TIMESTAMP: return tuple.timestampValue(fieldIndex);
+ default: throw new InvalidTypeException("Unknown element type: " +
nativeType);
+ }
+ }
}