This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 59107180be IGNITE-19788 Sql. Change QueryBatchMessage serialization
(#2476)
59107180be is described below
commit 59107180be987aa0a4f56ea7bf5d4688756947fb
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Mon Aug 28 14:26:40 2023 +0300
IGNITE-19788 Sql. Change QueryBatchMessage serialization (#2476)
---
.../src/integrationTest/sql/sqlite/join/join1.test | 4 +-
.../internal/sql/engine/exec/ArrayRowHandler.java | 14 +++++++
.../internal/sql/engine/exec/ExchangeService.java | 6 +--
.../sql/engine/exec/ExchangeServiceImpl.java | 11 +++---
.../sql/engine/exec/LogicalRelImplementor.java | 8 +++-
.../internal/sql/engine/exec/RowHandler.java | 43 +++++++++++++++++++++-
.../sql/engine/exec/exp/ExpressionFactoryImpl.java | 4 +-
.../engine/exec/exp/agg/AccumulatorsFactory.java | 2 +-
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 16 +++++++-
.../internal/sql/engine/exec/rel/Outbox.java | 12 +++++-
.../internal/sql/engine/externalize/RelJson.java | 4 +-
.../sql/engine/message/QueryBatchMessage.java | 5 +--
.../sql/engine/exec/rel/AbstractExecutionTest.java | 7 ++++
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 3 +-
.../sql/engine/exec/rel/ExecutionTest.java | 27 ++++++++++++++
15 files changed, 140 insertions(+), 26 deletions(-)
diff --git a/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
b/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
index 51d0795f61..7b6c043d59 100644
--- a/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
+++ b/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
@@ -145,12 +145,10 @@ SELECT * FROM t1 INNER JOIN t2 USING(b,c) ORDER BY t1.a;
statement error
SELECT * FROM t1 NATURAL JOIN t2 ON t1.a=t2.b;
-----
statement error
SELECT * FROM t1 JOIN t2 USING(a);
-----
statement error
SELECT * FROM t1 INNER OUTER JOIN t2;
-----
+
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
index e732aa704d..13d4ac5bd2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.sql.engine.exec;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
/**
* Handler for rows that implemented as a simple objects array.
@@ -54,6 +56,12 @@ public class ArrayRowHandler implements RowHandler<Object[]>
{
return row.length;
}
+ @Override
+ public ByteBuffer toByteBuffer(Object[] row) {
+ byte[] raw = ByteUtils.toBytes(row);
+ return ByteBuffer.wrap(raw);
+ }
+
/** {@inheritDoc} */
@Override
public String toString(Object[] objects) {
@@ -85,6 +93,12 @@ public class ArrayRowHandler implements RowHandler<Object[]>
{
return fields;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Object[] create(ByteBuffer raw) {
+ return ByteUtils.fromBytes(raw.array());
+ }
};
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index e038afd632..4211794044 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -43,12 +44,11 @@ public interface ExchangeService extends LifecycleAware {
* @param batchId The ID of the batch to which the data belongs.
* @param last Indicates whether this is the last batch of data to be sent.
* @param rows The data to be sent.
- * @param <RowT> The type of the rows int the batch.
* @return A {@link CompletableFuture future} representing the result of
operation,
* which completes when the data has been sent.
*/
- <RowT> CompletableFuture<Void> sendBatch(String nodeName, UUID queryId,
long fragmentId, long exchangeId, int batchId, boolean last,
- List<RowT> rows);
+ CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long
fragmentId, long exchangeId, int batchId, boolean last,
+ List<ByteBuffer> rows);
/**
* Asynchronously requests data from the specified node.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index c616389368..b601cb329b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -33,7 +34,6 @@ import
org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.TraceableException;
@@ -74,8 +74,9 @@ public class ExchangeServiceImpl implements ExchangeService {
/** {@inheritDoc} */
@Override
- public <RowT> CompletableFuture<Void> sendBatch(String nodeName, UUID
qryId, long fragmentId, long exchangeId, int batchId,
- boolean last, List<RowT> rows) {
+ public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long
fragmentId, long exchangeId, int batchId,
+ boolean last, List<ByteBuffer> rows) {
+
return messageService.send(
nodeName,
FACTORY.queryBatchMessage()
@@ -84,7 +85,7 @@ public class ExchangeServiceImpl implements ExchangeService {
.exchangeId(exchangeId)
.batchId(batchId)
.last(last)
- .rows(Commons.cast(rows))
+ .rows(rows)
.build()
);
}
@@ -162,7 +163,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
if (inbox != null) {
try {
- inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(),
Commons.cast(msg.rows()));
+ inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(),
msg.rows());
} catch (Throwable e) {
inbox.onError(e);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index d894685874..eab01147f6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -620,9 +620,15 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteReceiver rel) {
+ RelDataType rowType = rel.getRowType();
+
+ RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+
+ RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
+
Inbox<RowT> inbox = new Inbox<>(ctx, exchangeSvc, mailboxRegistry,
ctx.remotes(rel.exchangeId()),
expressionFactory.comparator(rel.collation()),
- rel.exchangeId(), rel.sourceFragmentId());
+ rowFactory, rel.exchangeId(), rel.sourceFragmentId());
mailboxRegistry.register(inbox);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index de8d4a7b2b..28ccfafe8b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
+import java.nio.ByteBuffer;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.jetbrains.annotations.Nullable;
@@ -24,29 +25,67 @@ import org.jetbrains.annotations.Nullable;
* Universal accessor and mutator for rows. It also has factory methods.
*/
public interface RowHandler<RowT> {
+ /**
+ * Extract appropriate field.
+ *
+ * @param field Field position to be processed.
+ * @param row Object to be extracted from.
+ */
@Nullable Object get(int field, RowT row);
+ /** Set incoming row field.
+ *
+ * @param field Field position to be processed.
+ * @param row Row which field need to be changed.
+ * @param val Value which should be set.
+ */
void set(int field, RowT row, @Nullable Object val);
+ /** Concatenate two rows. */
RowT concat(RowT left, RowT right);
+ /** Return column count contained in the incoming row. */
int columnCount(RowT row);
+ /**
+ * Assembly row representation as ByteBuffer.
+ *
+ * @param row Incoming data to be processed.
+ * @return {@link ByteBuffer} representation.
+ */
+ ByteBuffer toByteBuffer(RowT row);
+
+ /** String representation. */
String toString(RowT row);
/** Creates a factory that produces rows with fields defined by the given
schema. */
RowFactory<RowT> factory(RowSchema rowSchema);
/**
- * RowFactory interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Provide methods for inner row assembly.
*/
@SuppressWarnings("PublicInnerClass")
interface RowFactory<RowT> {
+ /** Return row accessor and mutator implementation. */
RowHandler<RowT> handler();
+ /** Create empty row. */
RowT create();
+ /**
+ * Create row using incoming objects.
+ *
+ * @param fields Sequential objects definitions output row will be
created from.
+ * @return Instantiation defined representation.
+ */
RowT create(Object... fields);
+
+ /**
+ * Create row using incoming {@link ByteBuffer}.
+ *
+ * @param raw {@link ByteBuffer} representation.
+ * @return Instantiation defined representation.
+ */
+ RowT create(ByteBuffer raw);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index f1416671a2..ed38d522c3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -646,7 +646,7 @@ public class ExpressionFactoryImpl<RowT> implements
ExpressionFactory<RowT> {
public boolean test(RowT r) {
scalar.execute(ctx, r, out);
- return Boolean.TRUE == hnd.get(0, out);
+ return Boolean.TRUE.equals(hnd.get(0, out));
}
}
@@ -662,7 +662,7 @@ public class ExpressionFactoryImpl<RowT> implements
ExpressionFactory<RowT> {
@Override
public boolean test(RowT r1, RowT r2) {
scalar.execute(ctx, r1, r2, out);
- return Boolean.TRUE == hnd.get(0, out);
+ return Boolean.TRUE.equals(hnd.get(0, out));
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index 3ab0b36f6d..c14fb4255d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -280,7 +280,7 @@ public class AccumulatorsFactory<RowT> implements
Supplier<List<AccumulatorWrapp
/** {@inheritDoc} */
@Override
public void add(RowT row) {
- if (type != AggregateType.REDUCE && filterArg >= 0 && Boolean.TRUE
!= handler.get(filterArg, row)) {
+ if (type != AggregateType.REDUCE && filterArg >= 0 &&
!Boolean.TRUE.equals(handler.get(filterArg, row))) {
return;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 3fa7f8974a..992dd2e259 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.calcite.util.Util.unexpected;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -33,6 +34,7 @@ import
org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -52,6 +54,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private final Collection<String> srcNodeNames;
private final @Nullable Comparator<RowT> comp;
private final Map<String, RemoteSource<RowT>> perNodeBuffers;
+ private final RowFactory<RowT> rowFactory;
private @Nullable List<RemoteSource<RowT>> remoteSources;
private int requested;
@@ -63,6 +66,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
* @param ctx Execution context.
* @param exchange Exchange service.
* @param registry Mailbox registry.
+ * @param rowFactory Incoming row factory.
* @param exchangeId Exchange ID.
* @param srcFragmentId Source fragment ID.
*/
@@ -72,6 +76,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
MailboxRegistry registry,
Collection<String> srcNodeNames,
@Nullable Comparator<RowT> comp,
+ RowFactory<RowT> rowFactory,
long exchangeId,
long srcFragmentId
) {
@@ -83,6 +88,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
this.registry = registry;
this.srcNodeNames = srcNodeNames;
this.comp = comp;
+ this.rowFactory = rowFactory;
this.srcFragmentId = srcFragmentId;
this.exchangeId = exchangeId;
@@ -157,12 +163,18 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
* @param last Last batch flag.
* @param rows Rows.
*/
- public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<RowT> rows) throws Exception {
+ public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<ByteBuffer> rows) throws Exception {
RemoteSource<RowT> source = perNodeBuffers.get(srcNodeName);
boolean waitingBefore = source.check() == State.WAITING;
- source.onBatchReceived(batchId, last, rows);
+ List<RowT> rows0 = new ArrayList<>(rows.size());
+
+ for (ByteBuffer row : rows) {
+ rows0.add(rowFactory.create(row));
+ }
+
+ source.onBatchReceived(batchId, last, rows0);
if (requested > 0 && waitingBefore && source.check() != State.WAITING)
{
push();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 387a25dbbc..2dc2b3e8f5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -232,7 +234,15 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
private void sendBatch(String nodeName, int batchId, boolean last,
List<RowT> rows) {
- exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId,
batchId, last, rows)
+ RowHandler<RowT> handler = context().rowHandler();
+
+ List<ByteBuffer> rows0 = new ArrayList<>(rows.size());
+
+ for (RowT row : rows) {
+ rows0.add(handler.toByteBuffer(row));
+ }
+
+ exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId,
batchId, last, rows0)
.whenComplete((ignored, ex) -> {
if (ex == null) {
return;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
index e9353b4b10..d755d215b5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
@@ -705,7 +705,7 @@ class RelJson {
} else if (o instanceof Map) {
Map<String, Object> map = (Map<String, Object>) o;
String clazz = (String) map.get("class");
- boolean nullable = Boolean.TRUE == map.get("nullable");
+ boolean nullable = Boolean.TRUE.equals(map.get("nullable"));
if (clazz != null) {
RelDataType type =
typeFactory.createJavaType(classForName(clazz, false));
@@ -826,7 +826,7 @@ class RelJson {
// Check if it is a local ref.
if (map.containsKey("type")) {
RelDataType type = toType(typeFactory, map.get("type"));
- return map.get("dynamic") == Boolean.TRUE
+ return Boolean.TRUE.equals(map.get("dynamic"))
? rexBuilder.makeDynamicParam(type, input)
: rexBuilder.makeLocalRef(type, input);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
index 058f52cf74..b1cc051270 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.sql.engine.message;
+import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -45,6 +45,5 @@ public interface QueryBatchMessage extends
ExecutionContextAwareMessage {
/**
* Get rows.
*/
- @Marshallable
- List<Object> rows();
+ List<ByteBuffer> rows();
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 87c1b2bb38..2b2670b423 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
@@ -47,6 +48,7 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.NetworkAddress;
@@ -343,6 +345,11 @@ public abstract class AbstractExecutionTest extends
IgniteAbstractTest {
public Object[] create(Object... fields) {
return fields;
}
+
+ @Override
+ public Object[] create(ByteBuffer raw) {
+ return ByteUtils.fromBytes(raw.array());
+ }
};
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 07f230d083..dfbfa5704b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -480,7 +480,8 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest {
createExchangeService(taskExecutor,
serviceFactory.forNode(localNode.name()), mailboxRegistry));
Inbox<Object[]> inbox = new Inbox<>(
- targetCtx, exchangeService, mailboxRegistry, sourceNodeNames,
comparator, SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
+ targetCtx, exchangeService, mailboxRegistry, sourceNodeNames,
comparator, rowFactory(),
+ SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
);
mailboxRegistry.register(inbox);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index ca9bda281b..e5316694eb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -26,23 +26,27 @@ import static org.apache.calcite.rel.core.JoinRelType.LEFT;
import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
import static org.apache.calcite.rel.core.JoinRelType.SEMI;
import static
org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
+import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -119,6 +123,29 @@ public class ExecutionTest extends AbstractExecutionTest {
assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
}
+ @Test
+ public void testRowFactoryAssembly() {
+ ExecutionContext<Object[]> ctx = executionContext(false);
+
+ RelDataType rowType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, boolean.class);
+
+ RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+
+ RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
+
+ Object[] row1 = rowFactory.create();
+
+ ctx.rowHandler().set(0, row1, 1);
+ ctx.rowHandler().set(1, row1, "2");
+ ctx.rowHandler().set(2, row1, false);
+
+ ByteBuffer bb = ctx.rowHandler().toByteBuffer(row1);
+
+ Object[] row2 = rowFactory.create(bb);
+
+ assertArrayEquals(row1, row2);
+ }
+
@Test
public void testUnionAll() {
ExecutionContext<Object[]> ctx = executionContext(true);