This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5a7bac093 IGNITE-20030 Sql. Delete operation fails if PK is set on
arbitrary columns (#2397)
a5a7bac093 is described below
commit a5a7bac09367b91ac7c2f241afca154ee96f4aae
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Sat Aug 12 10:44:13 2023 +0300
IGNITE-20030 Sql. Delete operation fails if PK is set on arbitrary columns
(#2397)
---
.../ignite/internal/sql/engine/ItDmlTest.java | 19 +++++
.../sql/engine/exec/UpdatableTableImpl.java | 92 ++++++++++++++--------
.../sql/engine/rule/TableModifyConverterRule.java | 27 ++++++-
.../sql/engine/framework/TestBuilders.java | 13 +++
.../sql/engine/planner/DmlPlannerTest.java | 38 +++++++++
5 files changed, 157 insertions(+), 32 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 27ed03a443..6c9f9a6ed4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -612,6 +612,25 @@ public class ItDmlTest extends
ClusterPerClassIntegrationTest {
.check();
}
+ @Test
+ public void testDeleteUsingCompositePk() {
+ sql("CREATE TABLE test (a INT, b VARCHAR NOT NULL, c INT NOT NULL, d
INT NOT NULL, PRIMARY KEY(d, b)) COLOCATE BY (d)");
+ sql("INSERT INTO test VALUES "
+ + "(0, '3', 0, 1),"
+ + "(0, '3', 0, 2),"
+ + "(0, '4', 0, 2)");
+
+ // Use PK index.
+ sql("DELETE FROM test WHERE b = '3' and d = 2");
+ assertQuery("SELECT d FROM test").returns(1).returns(2).check();
+
+ sql("DELETE FROM test WHERE d = 1");
+ assertQuery("SELECT b FROM test").returns("4").check();
+
+ sql("DELETE FROM test WHERE a = 0");
+ assertQuery("SELECT d FROM test").returnNothing();
+ }
+
private static void checkDuplicatePk(IgniteException ex) {
assertEquals(CONSTRAINT_VIOLATION_ERR, ex.code());
assertThat(ex.getMessage(), containsString("PK unique constraint is
violated"));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 516b1a219a..97889cc187 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -26,10 +26,11 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -47,6 +48,7 @@ import
org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -72,7 +74,16 @@ public final class UpdatableTableImpl implements
UpdatableTable {
private final SchemaDescriptor schemaDescriptor;
- private final List<ColumnDescriptor> columnsOrderedByPhysSchema;
+ private final ColumnDescriptor[] columnsOrderedByPhysSchema;
+
+ private final ColumnDescriptor[] keyColumnsOrderedByPhysSchema;
+
+ /**
+ * Mapping of key column indexes to its ordinals in the ordered list.
+ * It is used during a delete operation to assemble key-only binary row
from
+ * "truncated" relational node row containing only primary key columns.
+ */
+ private final TargetMapping columnsMappingForKeyOnlyRow;
private final PartitionExtractor partitionExtractor;
@@ -99,14 +110,35 @@ public final class UpdatableTableImpl implements
UpdatableTable {
this.schemaDescriptor = schemaDescriptor;
this.partitionExtractor = (row) ->
IgniteUtils.safeAbs(row.colocationHash()) % partitions;
- List<ColumnDescriptor> tmp = new ArrayList<>(desc.columnsCount());
+ ColumnDescriptor[] tmp = new ColumnDescriptor[desc.columnsCount()];
for (int i = 0; i < desc.columnsCount(); i++) {
- tmp.add(desc.columnDescriptor(i));
- }
+ ColumnDescriptor columnDescriptor = desc.columnDescriptor(i);
- tmp.sort(Comparator.comparingInt(ColumnDescriptor::physicalIndex));
+ tmp[columnDescriptor.physicalIndex()] = columnDescriptor;
+ }
columnsOrderedByPhysSchema = tmp;
+
+ int keyColumnsCount = schemaDescriptor.keyColumns().length();
+
+ ColumnDescriptor[] keyCols = new ColumnDescriptor[keyColumnsCount];
+ int[] keyLogicalIndexes = new int[keyColumnsCount];
+ int counter = 0;
+
+ for (ColumnDescriptor colDesc : tmp) {
+ if (colDesc.key()) {
+ keyCols[counter] = colDesc;
+ keyLogicalIndexes[counter] = colDesc.logicalIndex();
+
+ counter++;
+ }
+ }
+
+ keyColumnsOrderedByPhysSchema = keyCols;
+
+ ImmutableBitSet keysBitSet = ImmutableBitSet.of(keyLogicalIndexes);
+ columnsMappingForKeyOnlyRow =
Commons.trimmingMapping(keysBitSet.size(), keysBitSet);
+
this.rowConverter = rowConverter;
}
@@ -124,7 +156,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRowEx binaryRow = convertRow(row, ectx, false);
+ BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler());
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new
ArrayList<>()).add(binaryRow);
}
@@ -189,7 +221,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRowEx binaryRow = convertRow(row, ectx, false);
+ BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler());
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new
ArrayList<>()).add(binaryRow);
}
@@ -251,7 +283,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRowEx binaryRow = convertRow(row, ectx, true);
+ BinaryRowEx binaryRow = convertKeyOnlyRow(row, ectx.rowHandler());
keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k ->
new ArrayList<>()).add(binaryRow);
}
@@ -280,41 +312,39 @@ public final class UpdatableTableImpl implements
UpdatableTable {
return CompletableFuture.allOf(futures);
}
- private <RowT> BinaryRowEx convertRow(RowT row, ExecutionContext<RowT>
ectx, boolean keyOnly) {
- RowHandler<RowT> hnd = ectx.rowHandler();
+ private <RowT> BinaryRowEx convertRow(RowT row, RowHandler<RowT> hnd) {
+ RowAssembler rowAssembler = new RowAssembler(schemaDescriptor);
for (ColumnDescriptor colDesc : columnsOrderedByPhysSchema) {
- if (keyOnly && !colDesc.key()) {
- continue;
- }
-
- Object value = hnd.get(colDesc.logicalIndex(), row);
-
- // TODO Remove this check when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
- assert value != DEFAULT_VALUE_PLACEHOLDER;
+ Object val = hnd.get(colDesc.logicalIndex(), row);
- if (value == null) {
- break;
- }
+ appendValue(rowAssembler, colDesc, val);
}
- RowAssembler rowAssembler = keyOnly ?
RowAssembler.keyAssembler(schemaDescriptor) : new
RowAssembler(schemaDescriptor);
-
- for (ColumnDescriptor colDesc : columnsOrderedByPhysSchema) {
- if (keyOnly && !colDesc.key()) {
- continue;
- }
+ return new Row(schemaDescriptor, rowAssembler.build());
+ }
- Object val = hnd.get(colDesc.logicalIndex(), row);
+ private <RowT> BinaryRowEx convertKeyOnlyRow(RowT row, RowHandler<RowT>
hnd) {
+ RowAssembler rowAssembler =
RowAssembler.keyAssembler(schemaDescriptor);
- val = TypeUtils.fromInternal(val,
NativeTypeSpec.toClass(colDesc.physicalType().spec(), colDesc.nullable()));
+ for (ColumnDescriptor colDesc : keyColumnsOrderedByPhysSchema) {
+ Object val =
hnd.get(columnsMappingForKeyOnlyRow.getTarget(colDesc.logicalIndex()), row);
- RowAssembler.writeValue(rowAssembler, colDesc.physicalType(), val);
+ appendValue(rowAssembler, colDesc, val);
}
return new Row(schemaDescriptor, rowAssembler.build());
}
+ private static void appendValue(RowAssembler rowAssembler,
ColumnDescriptor colDesc, Object val) {
+ // TODO Remove this check when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+ assert val != DEFAULT_VALUE_PLACEHOLDER;
+
+ val = TypeUtils.fromInternal(val,
NativeTypeSpec.toClass(colDesc.physicalType().spec(), colDesc.nullable()));
+
+ RowAssembler.writeValue(rowAssembler, colDesc.physicalType(), val);
+ }
+
private static <RowT> CompletableFuture<List<RowT>> handleInsertResults(
RowHandler<RowT> handler,
CompletableFuture<List<RowT>>[] futs
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
index 05b32c8013..702743d2a1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
@@ -44,10 +45,13 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.util.IgniteIntList;
/**
* TableModifyConverterRule.
@@ -71,8 +75,29 @@ public class TableModifyConverterRule extends
AbstractIgniteConverterRule<Logica
IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
assert igniteTable != null;
+ IgniteDistribution distribution = igniteTable.distribution();
+
+ if (rel.getOperation() == Operation.DELETE) {
+ // To perform the delete, we need a row with key fields only.
+ // Input distribution contains the indexes of the key columns
according to the schema (i.e. for the full row).
+ // Here we adjusting distribution keys so that a row containing
only the key fields can be read.
+ IgniteIntList keyFields = new IgniteIntList();
+
+ for (int i = 0; i < igniteTable.descriptor().columnsCount(); i++) {
+ ColumnDescriptor column =
igniteTable.descriptor().columnDescriptor(i);
+
+ if (column.key()) {
+ keyFields.add(column.logicalIndex());
+ }
+ }
+
+ ImmutableBitSet keysBitSet = ImmutableBitSet.of(keyFields.array());
+
+ distribution =
distribution.apply(Commons.trimmingMapping(keysBitSet.size(), keysBitSet));
+ }
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(igniteTable.distribution())
+ .replace(distribution)
.replace(RelCollations.EMPTY);
RelNode input = convert(rel.getInput(), traits);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2d0e283046..d38427b2a3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -669,6 +669,16 @@ public class TestBuilders {
return self();
}
+ /** {@inheritDoc} */
+ @Override
+ public ChildT addKeyColumn(String name, NativeType type) {
+ columns.add(new ColumnDescriptorImpl(
+ name, true, false, columns.size(), columns.size(), type,
DefaultValueStrategy.DEFAULT_NULL, null
+ ));
+
+ return self();
+ }
+
/** {@inheritDoc} */
@Override
public ChildT addColumn(String name, NativeType type) {
@@ -780,6 +790,9 @@ public class TestBuilders {
/** Sets the distribution of the table. */
ChildT distribution(IgniteDistribution distribution);
+ /** Adds a key column to the table. */
+ ChildT addKeyColumn(String name, NativeType type);
+
/** Adds a column to the table. */
ChildT addColumn(String name, NativeType type);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
index 6a1409f4fa..3023660850 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -146,6 +146,29 @@ public class DmlPlannerTest extends AbstractPlannerTest {
);
}
+ @ParameterizedTest
+ @MethodSource("distributionsForDelete")
+ public void testDelete(IgniteDistribution distribution) throws Exception {
+ IgniteTable test1 = TestBuilders.table()
+ .name("TEST1")
+ .addColumn("C1", NativeTypes.INT32)
+ .addKeyColumn("KEY1", NativeTypes.INT32)
+ .addColumn("C2", NativeTypes.INT32)
+ .addKeyColumn("KEY2", NativeTypes.INT32)
+ .distribution(distribution)
+ .build();
+
+ IgniteSchema schema = createSchema(test1);
+
+ // There should be no exchange between the modify node and the scan
node.
+ assertPlan("DELETE FROM TEST1 WHERE KEY1 = 1 and KEY2 = 2", schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class)
+ .and(input(isTableScan("TEST1")))))
+ );
+ }
+
private static Stream<IgniteDistribution> distributions() {
return Stream.of(
IgniteDistributions.single(),
@@ -154,6 +177,21 @@ public class DmlPlannerTest extends AbstractPlannerTest {
);
}
+ /**
+ * Creates a list of non-single distributions with keys corresponding to
the indexes of the key columns of the table.
+ *
+ * @return Distributions to test DELETE operation.
+ */
+ private static Stream<IgniteDistribution> distributionsForDelete() {
+ return Stream.of(
+ IgniteDistributions.hash(List.of(1, 3)),
+ IgniteDistributions.affinity(1, 2, "0"),
+ IgniteDistributions.affinity(3, 2, "0"),
+ IgniteDistributions.affinity(List.of(1, 3), 2, "0"),
+ IgniteDistributions.affinity(List.of(3, 1), 2, "0")
+ );
+ }
+
/**
* Test for check basic dml operators when table doesn't exist.
*/