This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d9675ad06f IGNITE-21310 Sql. Introduce partition provider for further
pruning usage (#3102)
d9675ad06f is described below
commit d9675ad06f579956003a647d04c605a15b8c14b7
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Feb 7 11:38:36 2024 +0300
IGNITE-21310 Sql. Introduce partition provider for further pruning usage
(#3102)
---
.../sql/engine/exec/DestinationFactory.java | 18 ++--
.../internal/sql/engine/exec/ExecutableTable.java | 7 ++
.../engine/exec/ExecutableTableRegistryImpl.java | 16 ++-
.../sql/engine/exec/ExecutionServiceImpl.java | 4 -
.../sql/engine/exec/LogicalRelImplementor.java | 5 +-
.../engine/exec/RehashingPartitionExtractor.java | 51 +++++++++
.../sql/engine/exec/ResolvedDependencies.java | 11 ++
...utableTable.java => RowPartitionExtractor.java} | 25 ++---
.../sql/engine/exec/TablePartitionExtractor.java | 61 +++++++++++
.../internal/sql/engine/schema/IgniteTable.java | 5 +
.../sql/engine/schema/IgniteTableImpl.java | 29 +++++
.../sql/engine/schema/PartitionCalculator.java | 66 ++++++++++++
.../internal/sql/engine/trait/Partitioned.java | 12 +--
.../sql/engine/util/HashFunctionFactory.java | 58 ----------
.../sql/engine/util/HashFunctionFactoryImpl.java | 119 ---------------------
.../exec/ExecutionDependencyResolverSelfTest.java | 7 ++
.../sql/engine/exec/ExecutionServiceImplTest.java | 6 +-
.../exec/IdentityDistributionFunctionSelfTest.java | 2 +-
.../engine/exec/NoOpExecutableTableRegistry.java | 7 ++
.../sql/engine/exec/PartitionsResolutionTest.java | 115 ++++++++++++++++++++
.../sql/engine/framework/TestBuilders.java | 7 ++
.../sql/engine/prepare/TypeCoercionTest.java | 7 ++
.../sql/engine/util/HashFunctionsTest.java | 111 -------------------
23 files changed, 413 insertions(+), 336 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
index c41e71d616..2ecfba6b8f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
@@ -23,8 +23,10 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.AllNodes;
import org.apache.ignite.internal.sql.engine.trait.Destination;
@@ -35,7 +37,6 @@ import
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.Partitioned;
import org.apache.ignite.internal.sql.engine.trait.RandomNode;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
/**
* Factory that resolves {@link IgniteDistribution} trait, which represents
logical {@link DistributionFunction} function, into its
@@ -43,19 +44,16 @@ import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
*/
class DestinationFactory<RowT> {
private final RowHandler<RowT> rowHandler;
- private final HashFunctionFactory<RowT> hashFunctionFactory;
private final ResolvedDependencies dependencies;
/**
* Constructor.
*
* @param rowHandler Row handler.
- * @param hashFunctionFactory Hash-function factory required to resolve
hash-based distributions.
* @param dependencies Dependencies required to resolve row value
dependent distributions.
*/
- DestinationFactory(RowHandler<RowT> rowHandler, HashFunctionFactory<RowT>
hashFunctionFactory, ResolvedDependencies dependencies) {
+ DestinationFactory(RowHandler<RowT> rowHandler, ResolvedDependencies
dependencies) {
this.rowHandler = rowHandler;
- this.hashFunctionFactory = hashFunctionFactory;
this.dependencies = dependencies;
}
@@ -97,13 +95,17 @@ class DestinationFactory<RowT> {
if (function.affinity()) {
int tableId = ((AffinityDistribution) function).tableId();
-
+ Supplier<PartitionCalculator> calculator =
dependencies.partitionCalculator(tableId);
TableDescriptor tableDescriptor =
dependencies.tableDescriptor(tableId);
- return new Partitioned<>(assignments,
hashFunctionFactory.create(keys.toIntArray(), tableDescriptor));
+ var resolver = new
TablePartitionExtractor<>(calculator.get(), keys.toIntArray(), tableDescriptor,
rowHandler);
+
+ return new Partitioned<>(assignments, resolver);
}
- return new Partitioned<>(assignments,
hashFunctionFactory.create(keys.toIntArray()));
+ var resolver = new
RehashingPartitionExtractor<>(group.nodeNames().size(), keys.toIntArray(),
rowHandler);
+
+ return new Partitioned<>(group.nodeNames(), resolver);
}
default:
throw new IllegalStateException("Unsupported distribution
function.");
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
index 0c90af27d2..5e79344667 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
/**
@@ -37,4 +39,9 @@ public interface ExecutableTable {
* Returns a descriptor for the table.
*/
TableDescriptor tableDescriptor();
+
+ /**
+ * Return partition correspondence calculator.
+ */
+ Supplier<PartitionCalculator> partitionCalculator();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 4bc47473ca..1492aa415d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -22,12 +22,14 @@ import java.util.BitSet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.table.InternalTable;
@@ -93,7 +95,7 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
UpdatableTableImpl updatableTable = new
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
replicaService, clock, rowConverter);
- return new ExecutableTableImpl(scannableTable,
updatableTable);
+ return new ExecutableTableImpl(scannableTable,
updatableTable, sqlTable.partitionCalculator());
});
}
@@ -102,12 +104,16 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
private final UpdatableTable updatableTable;
+ private final Supplier<PartitionCalculator> partitionCalculator;
+
private ExecutableTableImpl(
ScannableTable scannableTable,
- UpdatableTable updatableTable
+ UpdatableTable updatableTable,
+ Supplier<PartitionCalculator> partitionCalculator
) {
this.scannableTable = scannableTable;
this.updatableTable = updatableTable;
+ this.partitionCalculator = partitionCalculator;
}
/** {@inheritDoc} */
@@ -127,6 +133,12 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
public TableDescriptor tableDescriptor() {
return updatableTable.descriptor();
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ return partitionCalculator;
+ }
}
private static CacheKey cacheKey(int tableId, int version) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index b6761cb624..ccf603e995 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -109,7 +109,6 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
@@ -193,8 +192,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
ExecutionDependencyResolver dependencyResolver,
long shutdownTimeout
) {
- HashFunctionFactoryImpl<RowT> rowHashFunctionFactory = new
HashFunctionFactoryImpl<>(handler);
-
return new ExecutionServiceImpl<>(
msgSrvc,
topSrvc,
@@ -207,7 +204,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
dependencyResolver,
(ctx, deps) -> new LogicalRelImplementor<>(
ctx,
- rowHashFunctionFactory,
mailboxRegistry,
exchangeSrvc,
deps),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 92beb1150d..520224b86e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -122,7 +122,6 @@ import
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
/**
* Implements a query plan.
@@ -148,14 +147,12 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
* Constructor.
*
* @param ctx Root context.
- * @param hashFuncFactory Factory to create a hash function for the row,
from which the destination nodes are calculated.
* @param mailboxRegistry Mailbox registry.
* @param exchangeSvc Exchange service.
* @param resolvedDependencies Dependencies required to execute this query.
*/
public LogicalRelImplementor(
ExecutionContext<RowT> ctx,
- HashFunctionFactory<RowT> hashFuncFactory,
MailboxRegistry mailboxRegistry,
ExchangeService exchangeSvc,
ResolvedDependencies resolvedDependencies) {
@@ -165,7 +162,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
this.resolvedDependencies = resolvedDependencies;
expressionFactory = ctx.expressionFactory();
- destinationFactory = new DestinationFactory<>(ctx.rowHandler(),
hashFuncFactory, resolvedDependencies);
+ destinationFactory = new DestinationFactory<>(ctx.rowHandler(),
resolvedDependencies);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
new file mode 100644
index 0000000000..496b492924
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/** Extract assignment based on incoming row. */
+public class RehashingPartitionExtractor<RowT> implements
RowPartitionExtractor<RowT> {
+ private final int targetCount;
+ private final int[] fields;
+ private final RowHandler<RowT> rowHandler;
+
+ /** Constructor. */
+ public RehashingPartitionExtractor(
+ int targetCount,
+ int[] fields,
+ RowHandler<RowT> rowHandler
+ ) {
+ this.targetCount = targetCount;
+ this.fields = fields;
+ this.rowHandler = rowHandler;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int partition(RowT row) {
+ int hash = 0;
+ for (int columnId : fields) {
+ Object value = rowHandler.get(columnId, row);
+
+ hash = 31 * hash + (value == null ? 0 : value.hashCode());
+ }
+
+ return IgniteUtils.safeAbs(hash) % targetCount;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
index 2371b09f87..871ff9aeb6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
/**
@@ -62,6 +64,15 @@ public class ResolvedDependencies {
return executableTable.tableDescriptor();
}
+ /**
+ * Returns partitions extractor.
+ */
+ public Supplier<PartitionCalculator> partitionCalculator(int tableId) {
+ ExecutableTable executableTable = getTable(tableId);
+
+ return executableTable.partitionCalculator();
+ }
+
/** Returns data source instance by given id. */
public ScannableDataSource dataSource(int dataSourceId) {
ScannableDataSource dataSource = dataSourceMap.get(dataSourceId);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
similarity index 68%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
index 0c90af27d2..2aa8093b3b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowPartitionExtractor.java
@@ -17,24 +17,13 @@
package org.apache.ignite.internal.sql.engine.exec;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-
-/**
- * Execution related APIs of a table.
- */
-public interface ExecutableTable {
- /**
- * Returns read API.
- */
- ScannableTable scannableTable();
-
- /**
- * Returns table modification API.
- */
- UpdatableTable updatableTable();
-
+/** Calculate partition according to supplied row. */
+@FunctionalInterface
+public interface RowPartitionExtractor<RowT> {
/**
- * Returns a descriptor for the table.
+ * Calculate partition based on supplied row.
+ *
+ * @return Resolved partition.
*/
- TableDescriptor tableDescriptor();
+ int partition(RowT row);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
new file mode 100644
index 0000000000..4b46a329ce
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TablePartitionExtractor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+
+/** Extract partition based on supplied row. */
+public class TablePartitionExtractor<RowT> implements
RowPartitionExtractor<RowT> {
+ private final PartitionCalculator partitionCalculator;
+ private final int[] fields;
+ private final TableDescriptor tableDescriptor;
+ private final RowHandler<RowT> rowHandler;
+
+ /** Constructor. */
+ public TablePartitionExtractor(
+ PartitionCalculator partitionCalculator,
+ int[] fields,
+ TableDescriptor tableDescriptor,
+ RowHandler<RowT> rowHandler
+ ) {
+ this.partitionCalculator = partitionCalculator;
+ this.fields = fields;
+ this.tableDescriptor = tableDescriptor;
+ this.rowHandler = rowHandler;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int partition(RowT row) {
+ int[] colocationColumns =
tableDescriptor.distribution().getKeys().toIntArray();
+
+ for (int i = 0; i < fields.length; i++) {
+ Object value = rowHandler.get(fields[i], row);
+
+ NativeTypeSpec nativeTypeSpec =
tableDescriptor.columnDescriptor(colocationColumns[i]).physicalType().spec();
+ Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec,
true);
+ value = TypeUtils.fromInternal(value, storageType);
+ partitionCalculator.append(value);
+ }
+
+ return partitionCalculator.partition();
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
index 56959335b9..53a886f84b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java
@@ -18,11 +18,16 @@
package org.apache.ignite.internal.sql.engine.schema;
import java.util.Map;
+import java.util.function.Supplier;
/**
* Table representation as object in SQL schema.
*/
public interface IgniteTable extends IgniteDataSource {
+ /**
+ * Return partition correspondence calculator.
+ */
+ Supplier<PartitionCalculator> partitionCalculator();
/**
* Returns all table indexes.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 1e412f468b..ec5efd20e9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.sql.engine.schema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
@@ -26,6 +28,8 @@ import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.schema.Statistic;
import
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.util.Lazy;
/**
* Table implementation for sql engine.
@@ -36,6 +40,8 @@ public class IgniteTableImpl extends AbstractIgniteDataSource
implements IgniteT
private final int partitions;
+ private final Lazy<NativeType[]> colocationColumnTypes;
+
/** Constructor. */
public IgniteTableImpl(String name, int id, int version, TableDescriptor
desc,
Statistic statistic, Map<String, IgniteIndex> indexMap, int
partitions) {
@@ -43,6 +49,29 @@ public class IgniteTableImpl extends
AbstractIgniteDataSource implements IgniteT
super(name, id, version, desc, statistic);
this.indexMap = indexMap;
this.partitions = partitions;
+
+ colocationColumnTypes = new Lazy<>(this::evaluateTypes);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ return () -> new PartitionCalculator(partitions,
Objects.requireNonNull(colocationColumnTypes.get()));
+ }
+
+ private NativeType[] evaluateTypes() {
+ int fieldCnt = descriptor().distribution().getKeys().size();
+ NativeType[] fieldTypes = new NativeType[fieldCnt];
+
+ int[] colocationColumns =
descriptor().distribution().getKeys().toIntArray();
+
+ for (int i = 0; i < fieldCnt; i++) {
+ ColumnDescriptor colDesc =
descriptor().columnDescriptor(colocationColumns[i]);
+
+ fieldTypes[i] = colDesc.physicalType();
+ }
+
+ return fieldTypes;
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
new file mode 100644
index 0000000000..ee63b1d8ca
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/PartitionCalculator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.schema;
+
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.util.ColocationUtils;
+import org.apache.ignite.internal.util.HashCalculator;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/** Extract partition based on supplied row and types info. */
+public class PartitionCalculator {
+ private final HashCalculator hashCalculator = new HashCalculator();
+ private final NativeType[] types;
+ private final int partitionCount;
+
+ private int currentField = 0;
+
+ /** Constructor. */
+ public PartitionCalculator(int partitionCount, NativeType[] types) {
+ this.partitionCount = partitionCount;
+ this.types = types;
+ }
+
+ /**
+ * Append object partition to be calculated for.
+ *
+ * @param value The object for which partition will be calculated.
+ */
+ public void append(@Nullable Object value) {
+ assert currentField < types.length;
+
+ ColocationUtils.append(hashCalculator, value, types[currentField++]);
+ }
+
+ /**
+ * Calculate partition based on appending objects.
+ *
+ * @return Resolved partition.
+ */
+ public int partition() {
+ assert currentField == types.length;
+
+ try {
+ return IgniteUtils.safeAbs(hashCalculator.hash()) % partitionCount;
+ } finally {
+ hashCalculator.reset();
+ currentField = 0;
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
index 1a6eb02b90..344c2f4f6a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java
@@ -20,9 +20,8 @@ package org.apache.ignite.internal.sql.engine.trait;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.RowPartitionExtractor;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
-import org.apache.ignite.internal.util.IgniteUtils;
/**
* Partitioned.
@@ -31,21 +30,22 @@ import org.apache.ignite.internal.util.IgniteUtils;
public final class Partitioned<RowT> implements Destination<RowT> {
private final List<List<String>> assignments;
- private final RowHashFunction<RowT> partFun;
+ private final RowPartitionExtractor<RowT> calc;
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public Partitioned(List<String> assignments, RowHashFunction<RowT>
partFun) {
+ public Partitioned(List<String> assignments, RowPartitionExtractor<RowT>
calc) {
+ this.calc = calc;
this.assignments = Commons.transform(assignments, List::of);
- this.partFun = partFun;
}
/** {@inheritDoc} */
@Override
public List<String> targets(RowT row) {
- return assignments.get(IgniteUtils.safeAbs(partFun.hashOf(row) %
assignments.size()));
+ int part = calc.partition(row);
+ return assignments.get(part);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
deleted file mode 100644
index c755a880cc..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-
-/**
- * Factory for creating a function to calculate the hash of the specified
fields of the row.
- */
-public interface HashFunctionFactory<T> {
- /**
- * A function to calculate hash of given row.
- *
- * @param <T> A type of the row.
- */
- @FunctionalInterface
- interface RowHashFunction<T> {
- /**
- * Calculates hash of given row.
- *
- * @param row A row to calculate hash for.
- * @return A hash of the row.
- */
- int hashOf(T row);
- }
-
- /**
- * Creates a hash function to compute a composite hash of a row, given the
values of the fields.
- *
- * @param fields Field ordinals of the row from which the hash is to be
calculated.
- * @return Function to compute a composite hash of a row, given the values
of the fields.
- */
- RowHashFunction<T> create(int[] fields);
-
- /**
- * Creates a hash function to compute a composite hash of a row, given the
types and values of the fields.
- *
- * @param fields Field ordinals of the row from which the hash is to be
calculated.
- * @param tableDescriptor Table descriptor.
- * @return Function to compute a composite hash of a row, given the types
and values of the fields.
- */
- RowHashFunction<T> create(int[] fields, TableDescriptor tableDescriptor);
-}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
deleted file mode 100644
index b58d2d9467..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import java.util.Objects;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.type.NativeType;
-import org.apache.ignite.internal.type.NativeTypeSpec;
-import org.apache.ignite.internal.util.ColocationUtils;
-import org.apache.ignite.internal.util.HashCalculator;
-
-/**
- * Factory for creating a function to calculate the hash of the specified
fields of a row.
- */
-public class HashFunctionFactoryImpl<T> implements HashFunctionFactory<T> {
- private final RowHandler<T> rowHandler;
-
- public HashFunctionFactoryImpl(RowHandler<T> rowHandler) {
- this.rowHandler = rowHandler;
- }
-
- /** {@inheritDoc} */
- @Override
- public RowHashFunction<T> create(int[] fields, TableDescriptor
tableDescriptor) {
- int fieldCnt = fields.length;
- NativeType[] fieldTypes = new NativeType[fieldCnt];
- ImmutableIntList colocationColumns =
tableDescriptor.distribution().getKeys();
-
- assert colocationColumns.size() == fieldCnt : "fieldsCount=" +
fieldCnt + ", colocationColumns=" + colocationColumns;
-
- for (int i = 0; i < fieldCnt; i++) {
- ColumnDescriptor colDesc =
tableDescriptor.columnDescriptor(colocationColumns.getInt(i));
-
- fieldTypes[i] = colDesc.physicalType();
- }
-
- return new TypesAwareHashFunction<>(fields, fieldTypes, rowHandler);
- }
-
- /** {@inheritDoc} */
- @Override
- public RowHashFunction<T> create(int[] fields) {
- return new SimpleHashFunction<>(fields, rowHandler);
- }
-
- /**
- * Computes a composite hash of a row, given the values of the fields.
- */
- static class SimpleHashFunction<T> implements RowHashFunction<T> {
- private final int[] fields;
- private final RowHandler<T> rowHandler;
-
- SimpleHashFunction(int[] fields, RowHandler<T> rowHandler) {
- this.fields = fields;
- this.rowHandler = rowHandler;
- }
-
- @Override
- public int hashOf(T row) {
- int hash = 0;
-
- for (int idx : fields) {
- hash = 31 * hash + Objects.hashCode(rowHandler.get(idx, row));
- }
-
- return hash;
- }
- }
-
- /**
- * Computes a composite hash of a row, given the types and values of the
fields.
- */
- static class TypesAwareHashFunction<T> implements RowHashFunction<T> {
- private final int[] fields;
- private final NativeType[] fieldTypes;
- private final RowHandler<T> rowHandler;
-
- TypesAwareHashFunction(int[] fields, NativeType[] fieldTypes,
RowHandler<T> rowHandler) {
- this.fields = fields;
- this.fieldTypes = fieldTypes;
- this.rowHandler = rowHandler;
- }
-
- @Override
- public int hashOf(T row) {
- HashCalculator hashCalc = new HashCalculator();
-
- for (int i = 0; i < fields.length; i++) {
- Object value = rowHandler.get(fields[i], row);
- NativeTypeSpec nativeTypeSpec = fieldTypes[i].spec();
- Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec,
true);
-
- value = TypeUtils.fromInternal(value, storageType);
-
- ColocationUtils.append(hashCalc, value, fieldTypes[i]);
- }
-
- return hashCalc.hash();
- }
- }
-}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
index a7625b1c29..b02cb4890e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
@@ -38,6 +39,7 @@ import
org.apache.ignite.internal.sql.engine.framework.TestTable;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.type.NativeTypes;
@@ -253,6 +255,11 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
public TableDescriptor tableDescriptor() {
return updates.descriptor();
}
+
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ return null;
+ }
}
private static TestTable createTestTable(String tableName) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5ff4454f37..514a17b3f6 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -121,8 +121,6 @@ import
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -1070,9 +1068,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
MailboxRegistry mailboxRegistry,
ExchangeService exchangeService,
ResolvedDependencies deps) {
- HashFunctionFactory<Object[]> funcFactory = new
HashFunctionFactoryImpl<>(ctx.rowHandler());
-
- return new LogicalRelImplementor<>(ctx, funcFactory,
mailboxRegistry, exchangeService, deps) {
+ return new LogicalRelImplementor<>(ctx, mailboxRegistry,
exchangeService, deps) {
@Override
public Node<Object[]> visit(IgniteTableScan rel) {
return new ScanNode<>(ctx, dataset) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
index 2294b7896c..20bd685e03 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
@@ -55,7 +55,7 @@ public class IdentityDistributionFunctionSelfTest {
.build();
private final ColocationGroup colocationGroup = new
ColocationGroup(List.of(1L), List.of(NODE_1, NODE_2, NODE_3), List.of());
- private final DestinationFactory<Object[]> destinationFactory = new
DestinationFactory<>(rowHandler, null, null);
+ private final DestinationFactory<Object[]> destinationFactory = new
DestinationFactory<>(rowHandler, null);
@Test
public void identityDistributionTrait() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
index 0639502f5a..c300103102 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
/** Stub implementation for {@link ExecutableTableRegistry}. */
@@ -54,6 +56,11 @@ public final class NoOpExecutableTableRegistry implements
ExecutableTableRegistr
throw noDependency();
}
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ throw new UnsupportedOperationException();
+ }
+
private IllegalStateException noDependency() {
return new IllegalStateException("NoOpExecutableTable: " +
tableId);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
new file mode 100644
index 0000000000..32ce12ede8
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionsResolutionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Assignments resolution test. */
+public class PartitionsResolutionTest {
+ @Test
+ public void partitionsResolver() {
+ RowHandler<RowWrapper> rowHandler = SqlRowHandler.INSTANCE;
+ RowFactory<RowWrapper> factory = rowHandler.factory(rowSchema);
+ RowWrapper row = factory.create("1", 100, 200, 100);
+
+ int part1 = getPartition(row, rowHandler, List.of(0, 1));
+ int part2 = getPartition(row, rowHandler, List.of(0, 2));
+ int part3 = getPartition(row, rowHandler, List.of(0, 3));
+
+ assertEquals(part1, part3);
+ assertNotEquals(part1, part2);
+ }
+
+ @Test
+ public void rehashingPartitionsResolver() {
+ RowHandler<RowWrapper> rowHandler = SqlRowHandler.INSTANCE;
+ RowFactory<RowWrapper> factory = rowHandler.factory(rowSchema);
+ RowWrapper row = factory.create("1", 100, 200, 100);
+ int[] keys1 = {0, 1};
+ int[] keys2 = {0, 2};
+ int[] keys3 = {0, 3};
+
+ var resolver1 = new RehashingPartitionExtractor<>(1000, keys1,
rowHandler);
+ var resolver2 = new RehashingPartitionExtractor<>(1000, keys2,
rowHandler);
+ var resolver3 = new RehashingPartitionExtractor<>(1000, keys3,
rowHandler);
+
+ int part1 = resolver1.partition(row);
+ int part2 = resolver2.partition(row);
+ int part3 = resolver3.partition(row);
+
+ assertEquals(part1, part3);
+ assertEquals(part1, resolver1.partition(row));
+ assertNotEquals(part1, part2);
+ }
+
+ private static int getPartition(RowWrapper row, RowHandler<RowWrapper>
rowHandler, List<Integer> distrKeys) {
+ TableDescriptor desc = createTableDescriptor(distrKeys);
+
+ int[] colocationColumns = desc.distribution().getKeys().toIntArray();
+
+ NativeType[] fieldTypes = new NativeType[colocationColumns.length];
+
+ for (int i = 0; i < colocationColumns.length; ++i) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+ fieldTypes[i] = colDesc.physicalType();
+ }
+
+ PartitionCalculator calc = new PartitionCalculator(100, fieldTypes);
+ TablePartitionExtractor<RowWrapper> extractor = new
TablePartitionExtractor<>(calc, colocationColumns, desc, rowHandler);
+ return extractor.partition(row);
+ }
+
+ private final RowSchema rowSchema = RowSchema.builder()
+ .addField(NativeTypes.STRING)
+ .addField(NativeTypes.INT32)
+ .addField(NativeTypes.INT32)
+ .addField(NativeTypes.INT32)
+ .build();
+
+ private static TableDescriptor createTableDescriptor(List<Integer>
distrKeys) {
+ Builder rowTypeBuilder = new Builder(Commons.typeFactory());
+
+ rowTypeBuilder = rowTypeBuilder.add("col1", SqlTypeName.VARCHAR)
+ .add("col2", SqlTypeName.INTEGER)
+ .add("col3", SqlTypeName.INTEGER)
+ .add("col4", SqlTypeName.INTEGER);
+
+ RelDataType rowType = rowTypeBuilder.build();
+
+ return new TestTableDescriptor(() ->
IgniteDistributions.affinity(distrKeys, 1, 1), rowType);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index d87fd33df4..0435768ddf 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
@@ -85,6 +86,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
@@ -1239,6 +1241,11 @@ public class TestBuilders {
public TableDescriptor tableDescriptor() {
return table.descriptor();
}
+
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ throw new UnsupportedOperationException();
+ }
});
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
index 4c51b57e9a..2c2e92ffbb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.avatica.util.TimeUnit;
@@ -63,6 +64,7 @@ import
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
@@ -645,6 +647,11 @@ public class TypeCoercionTest extends AbstractPlannerTest {
throw new AssertionError();
}
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ return null;
+ }
+
@Override
public Map<String, IgniteIndex> indexes() {
return Map.of();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
deleted file mode 100644
index 3bd1a4fbb5..0000000000
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import java.util.Arrays;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.SimpleHashFunction;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.TypesAwareHashFunction;
-import org.apache.ignite.internal.type.NativeType;
-import org.apache.ignite.internal.type.NativeTypes;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-/**
- * Basic tests for hash functions, which can be created using {@link
HashFunctionFactory}.
- */
-class HashFunctionsTest {
- /**
- * Ensures that the hash function accepts {@code null} values in row.
- */
- @ParameterizedTest
- @EnumSource
- public void checkNull(HashFunc func) {
- func.hash(new Object[]{null}, 0);
- }
-
- /**
- * Ensures that the hash function accepts empty row.
- */
- @ParameterizedTest
- @EnumSource
- public void checkEmpty(HashFunc func) {
- Assertions.assertEquals(0, func.hash(new Object[]{}));
- }
-
- /**
- * Ensures that the hash is computed according to the specified field
ordinals
- * and that the order of the ordinal numbers matters.
- */
- @ParameterizedTest
- @EnumSource
- public void checkOrder(HashFunc func) {
- Object[] row = {100, 200, 100};
-
- Assertions.assertNotEquals(
- func.hash(row, 0, 1),
- func.hash(row, 1, 0)
- );
-
- Assertions.assertEquals(
- func.hash(row, 0, 2),
- func.hash(row, 2, 0)
- );
- }
-
- enum HashFunc {
- SIMPLE,
- TYPE_AWARE;
-
- /**
- * Compute hash.
- *
- * @param row Row to process.
- * @param keys Ordinal numbers of key fields.
- * @return Composite hash for the specified fields.
- */
- int hash(Object[] row, int... keys) {
- RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
- RowHashFunction<Object[]> func;
-
- switch (this) {
- case SIMPLE:
- func = new SimpleHashFunction<>(keys, rowHandler);
-
- break;
-
- case TYPE_AWARE:
- NativeType[] fieldTypes = new NativeType[keys.length];
-
- Arrays.fill(fieldTypes, NativeTypes.INT32);
-
- func = new TypesAwareHashFunction<>(keys, fieldTypes,
rowHandler);
-
- break;
-
- default:
- throw new UnsupportedOperationException();
- }
-
- return func.hashOf(row);
- }
- }
-}