This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d9675ad06f IGNITE-21310 Sql. Introduce partition provider for further 
pruning usage (#3102)
d9675ad06f is described below

commit d9675ad06f579956003a647d04c605a15b8c14b7
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Feb 7 11:38:36 2024 +0300

    IGNITE-21310 Sql. Introduce partition provider for further pruning usage 
(#3102)
---
 .../sql/engine/exec/DestinationFactory.java        |  18 ++--
 .../internal/sql/engine/exec/ExecutableTable.java  |   7 ++
 .../engine/exec/ExecutableTableRegistryImpl.java   |  16 ++-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   4 -
 .../sql/engine/exec/LogicalRelImplementor.java     |   5 +-
 .../engine/exec/RehashingPartitionExtractor.java   |  51 +++++++++
 .../sql/engine/exec/ResolvedDependencies.java      |  11 ++
 ...utableTable.java => RowPartitionExtractor.java} |  25 ++---
 .../sql/engine/exec/TablePartitionExtractor.java   |  61 +++++++++++
 .../internal/sql/engine/schema/IgniteTable.java    |   5 +
 .../sql/engine/schema/IgniteTableImpl.java         |  29 +++++
 .../sql/engine/schema/PartitionCalculator.java     |  66 ++++++++++++
 .../internal/sql/engine/trait/Partitioned.java     |  12 +--
 .../sql/engine/util/HashFunctionFactory.java       |  58 ----------
 .../sql/engine/util/HashFunctionFactoryImpl.java   | 119 ---------------------
 .../exec/ExecutionDependencyResolverSelfTest.java  |   7 ++
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   6 +-
 .../exec/IdentityDistributionFunctionSelfTest.java |   2 +-
 .../engine/exec/NoOpExecutableTableRegistry.java   |   7 ++
 .../sql/engine/exec/PartitionsResolutionTest.java  | 115 ++++++++++++++++++++
 .../sql/engine/framework/TestBuilders.java         |   7 ++
 .../sql/engine/prepare/TypeCoercionTest.java       |   7 ++
 .../sql/engine/util/HashFunctionsTest.java         | 111 -------------------
 23 files changed, 413 insertions(+), 336 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
index c41e71d616..2ecfba6b8f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
@@ -23,8 +23,10 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Supplier;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.AllNodes;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
@@ -35,7 +37,6 @@ import 
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.Partitioned;
 import org.apache.ignite.internal.sql.engine.trait.RandomNode;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
 
 /**
  * Factory that resolves {@link IgniteDistribution} trait, which represents 
logical {@link DistributionFunction} function, into its
@@ -43,19 +44,16 @@ import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
  */
 class DestinationFactory<RowT> {
     private final RowHandler<RowT> rowHandler;
-    private final HashFunctionFactory<RowT> hashFunctionFactory;
     private final ResolvedDependencies dependencies;
 
     /**
      * Constructor.
      *
      * @param rowHandler Row handler.
-     * @param hashFunctionFactory Hash-function factory required to resolve 
hash-based distributions.
      * @param dependencies Dependencies required to resolve row value 
dependent distributions.
      */
-    DestinationFactory(RowHandler<RowT> rowHandler, HashFunctionFactory<RowT> 
hashFunctionFactory, ResolvedDependencies dependencies) {
+    DestinationFactory(RowHandler<RowT> rowHandler, ResolvedDependencies 
dependencies) {
         this.rowHandler = rowHandler;
-        this.hashFunctionFactory = hashFunctionFactory;
         this.dependencies = dependencies;
     }
 
@@ -97,13 +95,17 @@ class DestinationFactory<RowT> {
 
                 if (function.affinity()) {
                     int tableId = ((AffinityDistribution) function).tableId();
-
+                    Supplier<PartitionCalculator> calculator = 
dependencies.partitionCalculator(tableId);
                     TableDescriptor tableDescriptor = 
dependencies.tableDescriptor(tableId);
 
-                    return new Partitioned<>(assignments, 
hashFunctionFactory.create(keys.toIntArray(), tableDescriptor));
+                    var resolver = new 
TablePartitionExtractor<>(calculator.get(), keys.toIntArray(), tableDescriptor, 
rowHandler);
+
+                    return new Partitioned<>(assignments, resolver);
                 }
 
-                return new Partitioned<>(assignments, 
hashFunctionFactory.create(keys.toIntArray()));
+                var resolver = new 
RehashingPartitionExtractor<>(group.nodeNames().size(), keys.toIntArray(), 
rowHandler);
+
+                return new Partitioned<>(group.nodeNames(), resolver);
             }
             default:
                 throw new IllegalStateException("Unsupported distribution 
function.");
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
index 0c90af27d2..5e79344667 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 
 /**
@@ -37,4 +39,9 @@ public interface ExecutableTable {
      * Returns a descriptor for the table.
      */
     TableDescriptor tableDescriptor();
+
+    /**
+     * Return partition correspondence calculator.
+     */
+    Supplier<PartitionCalculator> partitionCalculator();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 4bc47473ca..1492aa415d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -22,12 +22,14 @@ import java.util.BitSet;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.table.InternalTable;
@@ -93,7 +95,7 @@ public class ExecutableTableRegistryImpl implements 
ExecutableTableRegistry {
                     UpdatableTableImpl updatableTable = new 
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
                             replicaService, clock, rowConverter);
 
-                    return new ExecutableTableImpl(scannableTable, 
updatableTable);
+                    return new ExecutableTableImpl(scannableTable, 
updatableTable, sqlTable.partitionCalculator());
                 });
     }
 
@@ -102,12 +104,16 @@ public class ExecutableTableRegistryImpl implements 
ExecutableTableRegistry {
 
         private final UpdatableTable updatableTable;
 
+        private final Supplier<PartitionCalculator> partitionCalculator;
+
         private ExecutableTableImpl(
                 ScannableTable scannableTable,
-                UpdatableTable updatableTable
+                UpdatableTable updatableTable,
+                Supplier<PartitionCalculator> partitionCalculator
         ) {
             this.scannableTable = scannableTable;
             this.updatableTable = updatableTable;
+            this.partitionCalculator = partitionCalculator;
         }
 
         /** {@inheritDoc} */
@@ -127,6 +133,12 @@ public class ExecutableTableRegistryImpl implements 
ExecutableTableRegistry {
         public TableDescriptor tableDescriptor() {
             return updatableTable.descriptor();
         }
+
+        /** {@inheritDoc} */
+        @Override
+        public Supplier<PartitionCalculator> partitionCalculator() {
+            return partitionCalculator;
+        }
     }
 
     private static CacheKey cacheKey(int tableId, int version) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index b6761cb624..ccf603e995 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -109,7 +109,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.AsyncCursor;
@@ -193,8 +192,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             ExecutionDependencyResolver dependencyResolver,
             long shutdownTimeout
     ) {
-        HashFunctionFactoryImpl<RowT> rowHashFunctionFactory = new 
HashFunctionFactoryImpl<>(handler);
-
         return new ExecutionServiceImpl<>(
                 msgSrvc,
                 topSrvc,
@@ -207,7 +204,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 dependencyResolver,
                 (ctx, deps) -> new LogicalRelImplementor<>(
                         ctx,
-                        rowHashFunctionFactory,
                         mailboxRegistry,
                         exchangeSrvc,
                         deps),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 92beb1150d..520224b86e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -122,7 +122,6 @@ import 
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
 
 /**
  * Implements a query plan.
@@ -148,14 +147,12 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
      * Constructor.
      *
      * @param ctx Root context.
-     * @param hashFuncFactory Factory to create a hash function for the row, 
from which the destination nodes are calculated.
      * @param mailboxRegistry Mailbox registry.
      * @param exchangeSvc Exchange service.
      * @param resolvedDependencies Dependencies required to execute this query.
      */
     public LogicalRelImplementor(
             ExecutionContext<RowT> ctx,
-            HashFunctionFactory<RowT> hashFuncFactory,
             MailboxRegistry mailboxRegistry,
             ExchangeService exchangeSvc,
             ResolvedDependencies resolvedDependencies) {
@@ -165,7 +162,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         this.resolvedDependencies = resolvedDependencies;
 
         expressionFactory = ctx.expressionFactory();
-        destinationFactory = new DestinationFactory<>(ctx.rowHandler(), 
hashFuncFactory, resolvedDependencies);
+        destinationFactory = new DestinationFactory<>(ctx.rowHandler(), 
resolvedDependencies);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
new file mode 100644
index 0000000000..496b492924
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/** Extract assignment based on incoming row. */
+public class RehashingPartitionExtractor<RowT> implements 
RowPartitionExtractor<RowT> {
+    private final int targetCount;
+    private final int[] fields;
+    private final RowHandler<RowT> rowHandler;
+
+    /** Constructor. */
+    public RehashingPartitionExtractor(
+            int targetCount,
+            int[] fields,
+            RowHandler<RowT> rowHandler
+    ) {
+        this.targetCount = targetCount;
+        this.fields = fields;
+        this.rowHandler = rowHandler;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int partition(RowT row) {
+        int hash = 0;
+        for (int columnId : fields) {
+            Object value = rowHandler.get(columnId, row);
+
+            hash = 31 * hash + (value == null ? 0 : value.hashCode());
+        }
+
+        return IgniteUtils.safeAbs(hash) % targetCount;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
index 2371b09f87..871ff9aeb6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 
 /**
@@ -62,6 +64,15 @@ public class ResolvedDependencies {
         return executableTable.tableDescriptor();
     }
 
+    /**
+     * Returns partitions extractor.
+     */
+    public Supplier<PartitionCalculator> partitionCalculator(int tableId) {
+        ExecutableTable executableTable = getTable(tableId);
+
+        return executableTable.partitionCalculator();
+    }
+
     /** Returns data source instance by given id. */
     public ScannableDataSource dataSource(int dataSourceId) {
         ScannableDataSource dataSource = dataSourceMap.get(dataSourceId);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
similarity index 68%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
index 0c90af27d2..2aa8093b3b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
@@ -17,24 +17,13 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-
-/**
- * Execution related APIs of a table.
- */
-public interface ExecutableTable {
-    /**
-     * Returns read API.
-     */
-    ScannableTable scannableTable();
-
-    /**
-     * Returns table modification API.
-     */
-    UpdatableTable updatableTable();
-
+/** Calculate partition according to supplied row. */
+@FunctionalInterface
+public interface RowPartitionExtractor<RowT> {
     /**
-     * Returns a descriptor for the table.
+     * Calculate partition based on supplied row.
+     *
+     * @return Resolved partition.
      */
-    TableDescriptor tableDescriptor();
+    int partition(RowT row);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
new file mode 100644
index 0000000000..4b46a329ce
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+
+/** Extract partition based on supplied row.  */
+public class TablePartitionExtractor<RowT> implements 
RowPartitionExtractor<RowT> {
+    private final PartitionCalculator partitionCalculator;
+    private final int[] fields;
+    private final TableDescriptor tableDescriptor;
+    private final RowHandler<RowT> rowHandler;
+
+    /** Constructor. */
+    public TablePartitionExtractor(
+            PartitionCalculator partitionCalculator,
+            int[] fields,
+            TableDescriptor tableDescriptor,
+            RowHandler<RowT> rowHandler
+    ) {
+        this.partitionCalculator = partitionCalculator;
+        this.fields = fields;
+        this.tableDescriptor = tableDescriptor;
+        this.rowHandler = rowHandler;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int partition(RowT row) {
+        int[] colocationColumns = 
tableDescriptor.distribution().getKeys().toIntArray();
+
+        for (int i = 0; i < fields.length; i++) {
+            Object value = rowHandler.get(fields[i], row);
+
+            NativeTypeSpec nativeTypeSpec = 
tableDescriptor.columnDescriptor(colocationColumns[i]).physicalType().spec();
+            Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec, 
true);
+            value = TypeUtils.fromInternal(value, storageType);
+            partitionCalculator.append(value);
+        }
+
+        return partitionCalculator.partition();
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
index 56959335b9..53a886f84b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
@@ -18,11 +18,16 @@
 package org.apache.ignite.internal.sql.engine.schema;
 
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Table representation as object in SQL schema.
  */
 public interface IgniteTable extends IgniteDataSource {
+    /**
+     * Return partition correspondence calculator.
+     */
+    Supplier<PartitionCalculator> partitionCalculator();
 
     /**
      * Returns all table indexes.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 1e412f468b..ec5efd20e9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.sql.engine.schema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
@@ -26,6 +28,8 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.schema.Statistic;
 import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.util.Lazy;
 
 /**
  * Table implementation for sql engine.
@@ -36,6 +40,8 @@ public class IgniteTableImpl extends AbstractIgniteDataSource 
implements IgniteT
 
     private final int partitions;
 
+    private final Lazy<NativeType[]> colocationColumnTypes;
+
     /** Constructor. */
     public IgniteTableImpl(String name, int id, int version, TableDescriptor 
desc,
             Statistic statistic, Map<String, IgniteIndex> indexMap, int 
partitions) {
@@ -43,6 +49,29 @@ public class IgniteTableImpl extends 
AbstractIgniteDataSource implements IgniteT
         super(name, id, version, desc, statistic);
         this.indexMap = indexMap;
         this.partitions = partitions;
+
+        colocationColumnTypes = new Lazy<>(this::evaluateTypes);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Supplier<PartitionCalculator> partitionCalculator() {
+        return () -> new PartitionCalculator(partitions, 
Objects.requireNonNull(colocationColumnTypes.get()));
+    }
+
+    private NativeType[] evaluateTypes() {
+        int fieldCnt = descriptor().distribution().getKeys().size();
+        NativeType[] fieldTypes = new NativeType[fieldCnt];
+
+        int[] colocationColumns = 
descriptor().distribution().getKeys().toIntArray();
+
+        for (int i = 0; i < fieldCnt; i++) {
+            ColumnDescriptor colDesc = 
descriptor().columnDescriptor(colocationColumns[i]);
+
+            fieldTypes[i] = colDesc.physicalType();
+        }
+
+        return fieldTypes;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
new file mode 100644
index 0000000000..ee63b1d8ca
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.schema;
+
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.util.ColocationUtils;
+import org.apache.ignite.internal.util.HashCalculator;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/** Extract partition based on supplied row and types info. */
+public class PartitionCalculator {
+    private final HashCalculator hashCalculator = new HashCalculator();
+    private final NativeType[] types;
+    private final int partitionCount;
+
+    private int currentField = 0;
+
+    /** Constructor. */
+    public PartitionCalculator(int partitionCount, NativeType[] types) {
+        this.partitionCount = partitionCount;
+        this.types = types;
+    }
+
+    /**
+     * Append object partition to be calculated for.
+     *
+     * @param value The object for which partition will be calculated.
+     */
+    public void append(@Nullable Object value) {
+        assert currentField < types.length;
+
+        ColocationUtils.append(hashCalculator, value, types[currentField++]);
+    }
+
+    /**
+     * Calculate partition based on appending objects.
+     *
+     * @return Resolved partition.
+     */
+    public int partition() {
+        assert currentField == types.length;
+
+        try {
+            return IgniteUtils.safeAbs(hashCalculator.hash()) % partitionCount;
+        } finally {
+            hashCalculator.reset();
+            currentField = 0;
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
index 1a6eb02b90..344c2f4f6a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
@@ -20,9 +20,8 @@ package org.apache.ignite.internal.sql.engine.trait;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.RowPartitionExtractor;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
-import org.apache.ignite.internal.util.IgniteUtils;
 
 /**
  * Partitioned.
@@ -31,21 +30,22 @@ import org.apache.ignite.internal.util.IgniteUtils;
 public final class Partitioned<RowT> implements Destination<RowT> {
     private final List<List<String>> assignments;
 
-    private final RowHashFunction<RowT> partFun;
+    private final RowPartitionExtractor<RowT> calc;
 
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public Partitioned(List<String> assignments, RowHashFunction<RowT> 
partFun) {
+    public Partitioned(List<String> assignments, RowPartitionExtractor<RowT> 
calc) {
+        this.calc = calc;
         this.assignments = Commons.transform(assignments, List::of);
-        this.partFun = partFun;
     }
 
     /** {@inheritDoc} */
     @Override
     public List<String> targets(RowT row) {
-        return assignments.get(IgniteUtils.safeAbs(partFun.hashOf(row) % 
assignments.size()));
+        int part = calc.partition(row);
+        return assignments.get(part);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
deleted file mode 100644
index c755a880cc..0000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-
-/**
- * Factory for creating a function to calculate the hash of the specified 
fields of the row.
- */
-public interface HashFunctionFactory<T> {
-    /**
-     * A function to calculate hash of given row.
-     *
-     * @param <T> A type of the row.
-     */
-    @FunctionalInterface
-    interface RowHashFunction<T> {
-        /**
-         * Calculates hash of given row.
-         *
-         * @param row A row to calculate hash for.
-         * @return A hash of the row.
-         */
-        int hashOf(T row);
-    }
-
-    /**
-     * Creates a hash function to compute a composite hash of a row, given the 
values of the fields.
-     *
-     * @param fields Field ordinals of the row from which the hash is to be 
calculated.
-     * @return Function to compute a composite hash of a row, given the values 
of the fields.
-     */
-    RowHashFunction<T> create(int[] fields);
-
-    /**
-     * Creates a hash function to compute a composite hash of a row, given the 
types and values of the fields.
-     *
-     * @param fields Field ordinals of the row from which the hash is to be 
calculated.
-     * @param tableDescriptor Table descriptor.
-     * @return Function to compute a composite hash of a row, given the types 
and values of the fields.
-     */
-    RowHashFunction<T> create(int[] fields, TableDescriptor tableDescriptor);
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
deleted file mode 100644
index b58d2d9467..0000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import java.util.Objects;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.type.NativeType;
-import org.apache.ignite.internal.type.NativeTypeSpec;
-import org.apache.ignite.internal.util.ColocationUtils;
-import org.apache.ignite.internal.util.HashCalculator;
-
-/**
- * Factory for creating a function to calculate the hash of the specified 
fields of a row.
- */
-public class HashFunctionFactoryImpl<T> implements HashFunctionFactory<T> {
-    private final RowHandler<T> rowHandler;
-
-    public HashFunctionFactoryImpl(RowHandler<T> rowHandler) {
-        this.rowHandler = rowHandler;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public RowHashFunction<T> create(int[] fields, TableDescriptor 
tableDescriptor) {
-        int fieldCnt = fields.length;
-        NativeType[] fieldTypes = new NativeType[fieldCnt];
-        ImmutableIntList colocationColumns = 
tableDescriptor.distribution().getKeys();
-
-        assert colocationColumns.size() == fieldCnt : "fieldsCount=" + 
fieldCnt + ", colocationColumns=" + colocationColumns;
-
-        for (int i = 0; i < fieldCnt; i++) {
-            ColumnDescriptor colDesc = 
tableDescriptor.columnDescriptor(colocationColumns.getInt(i));
-
-            fieldTypes[i] = colDesc.physicalType();
-        }
-
-        return new TypesAwareHashFunction<>(fields, fieldTypes, rowHandler);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public RowHashFunction<T> create(int[] fields) {
-        return new SimpleHashFunction<>(fields, rowHandler);
-    }
-
-    /**
-     * Computes a composite hash of a row, given the values of the fields.
-     */
-    static class SimpleHashFunction<T> implements RowHashFunction<T> {
-        private final int[] fields;
-        private final RowHandler<T> rowHandler;
-
-        SimpleHashFunction(int[] fields, RowHandler<T> rowHandler) {
-            this.fields = fields;
-            this.rowHandler = rowHandler;
-        }
-
-        @Override
-        public int hashOf(T row) {
-            int hash = 0;
-
-            for (int idx : fields) {
-                hash = 31 * hash + Objects.hashCode(rowHandler.get(idx, row));
-            }
-
-            return hash;
-        }
-    }
-
-    /**
-     * Computes a composite hash of a row, given the types and values of the 
fields.
-     */
-    static class TypesAwareHashFunction<T> implements RowHashFunction<T> {
-        private final int[] fields;
-        private final NativeType[] fieldTypes;
-        private final RowHandler<T> rowHandler;
-
-        TypesAwareHashFunction(int[] fields, NativeType[] fieldTypes, 
RowHandler<T> rowHandler) {
-            this.fields = fields;
-            this.fieldTypes = fieldTypes;
-            this.rowHandler = rowHandler;
-        }
-
-        @Override
-        public int hashOf(T row) {
-            HashCalculator hashCalc = new HashCalculator();
-
-            for (int i = 0; i < fields.length; i++) {
-                Object value = rowHandler.get(fields[i], row);
-                NativeTypeSpec nativeTypeSpec = fieldTypes[i].spec();
-                Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec, 
true);
-
-                value = TypeUtils.fromInternal(value, storageType);
-
-                ColocationUtils.append(hashCalc, value, fieldTypes[i]);
-            }
-
-            return hashCalc.hash();
-        }
-    }
-}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
index a7625b1c29..b02cb4890e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
@@ -38,6 +39,7 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestTable;
 import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -253,6 +255,11 @@ public class ExecutionDependencyResolverSelfTest extends 
AbstractPlannerTest {
         public TableDescriptor tableDescriptor() {
             return updates.descriptor();
         }
+
+        @Override
+        public Supplier<PartitionCalculator> partitionCalculator() {
+            return null;
+        }
     }
 
     private static TestTable createTestTable(String tableName) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5ff4454f37..514a17b3f6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -121,8 +121,6 @@ import 
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -1070,9 +1068,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                     MailboxRegistry mailboxRegistry,
                     ExchangeService exchangeService,
                     ResolvedDependencies deps) {
-                HashFunctionFactory<Object[]> funcFactory = new 
HashFunctionFactoryImpl<>(ctx.rowHandler());
-
-                return new LogicalRelImplementor<>(ctx, funcFactory, 
mailboxRegistry, exchangeService, deps) {
+                return new LogicalRelImplementor<>(ctx, mailboxRegistry, 
exchangeService, deps) {
                     @Override
                     public Node<Object[]> visit(IgniteTableScan rel) {
                         return new ScanNode<>(ctx, dataset) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
index 2294b7896c..20bd685e03 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
@@ -55,7 +55,7 @@ public class IdentityDistributionFunctionSelfTest {
             .build();
 
     private final ColocationGroup colocationGroup = new 
ColocationGroup(List.of(1L), List.of(NODE_1, NODE_2, NODE_3), List.of());
-    private final DestinationFactory<Object[]> destinationFactory = new 
DestinationFactory<>(rowHandler, null, null);
+    private final DestinationFactory<Object[]> destinationFactory = new 
DestinationFactory<>(rowHandler, null);
 
     @Test
     public void identityDistributionTrait() {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
index 0639502f5a..c300103102 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 
 /** Stub implementation for {@link ExecutableTableRegistry}. */
@@ -54,6 +56,11 @@ public final class NoOpExecutableTableRegistry implements 
ExecutableTableRegistr
             throw noDependency();
         }
 
+        @Override
+        public Supplier<PartitionCalculator> partitionCalculator() {
+            throw new UnsupportedOperationException();
+        }
+
         private IllegalStateException noDependency() {
             return new IllegalStateException("NoOpExecutableTable: " + 
tableId);
         }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
new file mode 100644
index 0000000000..32ce12ede8
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Assignments resolution test. */
+public class PartitionsResolutionTest {
+    @Test
+    public void partitionsResolver() {
+        RowHandler<RowWrapper> rowHandler = SqlRowHandler.INSTANCE;
+        RowFactory<RowWrapper> factory = rowHandler.factory(rowSchema);
+        RowWrapper row = factory.create("1", 100, 200, 100);
+
+        int part1 = getPartition(row, rowHandler, List.of(0, 1));
+        int part2 = getPartition(row, rowHandler, List.of(0, 2));
+        int part3 = getPartition(row, rowHandler, List.of(0, 3));
+
+        assertEquals(part1, part3);
+        assertNotEquals(part1, part2);
+    }
+
+    @Test
+    public void rehashingPartitionsResolver() {
+        RowHandler<RowWrapper> rowHandler = SqlRowHandler.INSTANCE;
+        RowFactory<RowWrapper> factory = rowHandler.factory(rowSchema);
+        RowWrapper row = factory.create("1", 100, 200, 100);
+        int[] keys1 = {0, 1};
+        int[] keys2 = {0, 2};
+        int[] keys3 = {0, 3};
+
+        var resolver1 = new RehashingPartitionExtractor<>(1000, keys1, 
rowHandler);
+        var resolver2 = new RehashingPartitionExtractor<>(1000, keys2, 
rowHandler);
+        var resolver3 = new RehashingPartitionExtractor<>(1000, keys3, 
rowHandler);
+
+        int part1 = resolver1.partition(row);
+        int part2 = resolver2.partition(row);
+        int part3 = resolver3.partition(row);
+
+        assertEquals(part1, part3);
+        assertEquals(part1, resolver1.partition(row));
+        assertNotEquals(part1, part2);
+    }
+
+    private static int getPartition(RowWrapper row, RowHandler<RowWrapper> 
rowHandler, List<Integer> distrKeys) {
+        TableDescriptor desc = createTableDescriptor(distrKeys);
+
+        int[] colocationColumns = desc.distribution().getKeys().toIntArray();
+
+        NativeType[] fieldTypes = new NativeType[colocationColumns.length];
+
+        for (int i = 0; i < colocationColumns.length; ++i) {
+            ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+            fieldTypes[i] = colDesc.physicalType();
+        }
+
+        PartitionCalculator calc = new PartitionCalculator(100, fieldTypes);
+        TablePartitionExtractor<RowWrapper> extractor = new 
TablePartitionExtractor<>(calc, colocationColumns, desc, rowHandler);
+        return extractor.partition(row);
+    }
+
+    private final RowSchema rowSchema = RowSchema.builder()
+            .addField(NativeTypes.STRING)
+            .addField(NativeTypes.INT32)
+            .addField(NativeTypes.INT32)
+            .addField(NativeTypes.INT32)
+            .build();
+
+    private static TableDescriptor createTableDescriptor(List<Integer> 
distrKeys) {
+        Builder rowTypeBuilder = new Builder(Commons.typeFactory());
+
+        rowTypeBuilder = rowTypeBuilder.add("col1", SqlTypeName.VARCHAR)
+        .add("col2", SqlTypeName.INTEGER)
+        .add("col3", SqlTypeName.INTEGER)
+        .add("col4", SqlTypeName.INTEGER);
+
+        RelDataType rowType =  rowTypeBuilder.build();
+
+        return new TestTableDescriptor(() -> 
IgniteDistributions.affinity(distrKeys, 1, 1), rowType);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index d87fd33df4..0435768ddf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogCommand;
@@ -85,6 +86,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
@@ -1239,6 +1241,11 @@ public class TestBuilders {
                 public TableDescriptor tableDescriptor() {
                     return table.descriptor();
                 }
+
+                @Override
+                public Supplier<PartitionCalculator> partitionCalculator() {
+                    throw new UnsupportedOperationException();
+                }
             });
         }
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
index 4c51b57e9a..2c2e92ffbb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.avatica.util.TimeUnit;
@@ -63,6 +64,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
@@ -645,6 +647,11 @@ public class TypeCoercionTest extends AbstractPlannerTest {
             throw new AssertionError();
         }
 
+        @Override
+        public Supplier<PartitionCalculator> partitionCalculator() {
+            return null;
+        }
+
         @Override
         public Map<String, IgniteIndex> indexes() {
             return Map.of();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
deleted file mode 100644
index 3bd1a4fbb5..0000000000
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import java.util.Arrays;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
-import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.SimpleHashFunction;
-import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.TypesAwareHashFunction;
-import org.apache.ignite.internal.type.NativeType;
-import org.apache.ignite.internal.type.NativeTypes;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-/**
- * Basic tests for hash functions, which can be created using {@link 
HashFunctionFactory}.
- */
-class HashFunctionsTest {
-    /**
-     * Ensures that the hash function accepts {@code null} values in row.
-     */
-    @ParameterizedTest
-    @EnumSource
-    public void checkNull(HashFunc func) {
-        func.hash(new Object[]{null}, 0);
-    }
-
-    /**
-     * Ensures that the hash function accepts empty row.
-     */
-    @ParameterizedTest
-    @EnumSource
-    public void checkEmpty(HashFunc func) {
-        Assertions.assertEquals(0, func.hash(new Object[]{}));
-    }
-
-    /**
-     * Ensures that the hash is computed according to the specified field 
ordinals
-     * and that the order of the ordinal numbers matters.
-     */
-    @ParameterizedTest
-    @EnumSource
-    public void checkOrder(HashFunc func) {
-        Object[] row = {100, 200, 100};
-
-        Assertions.assertNotEquals(
-                func.hash(row, 0, 1),
-                func.hash(row, 1, 0)
-        );
-
-        Assertions.assertEquals(
-                func.hash(row, 0, 2),
-                func.hash(row, 2, 0)
-        );
-    }
-
-    enum HashFunc {
-        SIMPLE,
-        TYPE_AWARE;
-
-        /**
-         * Compute hash.
-         *
-         * @param row Row to process.
-         * @param keys Ordinal numbers of key fields.
-         * @return Composite hash for the specified fields.
-         */
-        int hash(Object[] row, int... keys) {
-            RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
-            RowHashFunction<Object[]> func;
-
-            switch (this) {
-                case SIMPLE:
-                    func = new SimpleHashFunction<>(keys, rowHandler);
-
-                    break;
-
-                case TYPE_AWARE:
-                    NativeType[] fieldTypes = new NativeType[keys.length];
-
-                    Arrays.fill(fieldTypes, NativeTypes.INT32);
-
-                    func = new TypesAwareHashFunction<>(keys, fieldTypes, 
rowHandler);
-
-                    break;
-
-                default:
-                    throw new UnsupportedOperationException();
-            }
-
-            return func.hashOf(row);
-        }
-    }
-}

Reply via email to