This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a5a7bac093 IGNITE-20030 Sql. Delete operation fails if PK is set on 
arbitrary columns (#2397)
a5a7bac093 is described below

commit a5a7bac09367b91ac7c2f241afca154ee96f4aae
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Sat Aug 12 10:44:13 2023 +0300

    IGNITE-20030 Sql. Delete operation fails if PK is set on arbitrary columns 
(#2397)
---
 .../ignite/internal/sql/engine/ItDmlTest.java      | 19 +++++
 .../sql/engine/exec/UpdatableTableImpl.java        | 92 ++++++++++++++--------
 .../sql/engine/rule/TableModifyConverterRule.java  | 27 ++++++-
 .../sql/engine/framework/TestBuilders.java         | 13 +++
 .../sql/engine/planner/DmlPlannerTest.java         | 38 +++++++++
 5 files changed, 157 insertions(+), 32 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 27ed03a443..6c9f9a6ed4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -612,6 +612,25 @@ public class ItDmlTest extends 
ClusterPerClassIntegrationTest {
                 .check();
     }
 
+    @Test
+    public void testDeleteUsingCompositePk() {
+        sql("CREATE TABLE test (a INT, b VARCHAR NOT NULL, c INT NOT NULL, d 
INT NOT NULL, PRIMARY KEY(d, b)) COLOCATE BY (d)");
+        sql("INSERT INTO test VALUES "
+                + "(0, '3', 0, 1),"
+                + "(0, '3', 0, 2),"
+                + "(0, '4', 0, 2)");
+
+        // Use PK index.
+        sql("DELETE FROM test WHERE b = '3' and d = 2");
+        assertQuery("SELECT d FROM test").returns(1).returns(2).check();
+
+        sql("DELETE FROM test WHERE d = 1");
+        assertQuery("SELECT b FROM test").returns("4").check();
+
+        sql("DELETE FROM test WHERE a = 0");
+        assertQuery("SELECT d FROM test").returnNothing();
+    }
+
     private static void checkDuplicatePk(IgniteException ex) {
         assertEquals(CONSTRAINT_VIOLATION_ERR, ex.code());
         assertThat(ex.getMessage(), containsString("PK unique constraint is 
violated"));
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 516b1a219a..97889cc187 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -26,10 +26,11 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -47,6 +48,7 @@ import 
org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -72,7 +74,16 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
 
     private final SchemaDescriptor schemaDescriptor;
 
-    private final List<ColumnDescriptor> columnsOrderedByPhysSchema;
+    private final ColumnDescriptor[] columnsOrderedByPhysSchema;
+
+    private final ColumnDescriptor[] keyColumnsOrderedByPhysSchema;
+
+    /**
+     * Mapping of key column indexes to its ordinals in the ordered list.
+     * It is used during a delete operation to assemble key-only binary row 
from
+     * "truncated" relational node row containing only primary key columns.
+     */
+    private final TargetMapping columnsMappingForKeyOnlyRow;
 
     private final PartitionExtractor partitionExtractor;
 
@@ -99,14 +110,35 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         this.schemaDescriptor = schemaDescriptor;
         this.partitionExtractor = (row) -> 
IgniteUtils.safeAbs(row.colocationHash()) % partitions;
 
-        List<ColumnDescriptor> tmp = new ArrayList<>(desc.columnsCount());
+        ColumnDescriptor[] tmp = new ColumnDescriptor[desc.columnsCount()];
         for (int i = 0; i < desc.columnsCount(); i++) {
-            tmp.add(desc.columnDescriptor(i));
-        }
+            ColumnDescriptor columnDescriptor = desc.columnDescriptor(i);
 
-        tmp.sort(Comparator.comparingInt(ColumnDescriptor::physicalIndex));
+            tmp[columnDescriptor.physicalIndex()] = columnDescriptor;
+        }
 
         columnsOrderedByPhysSchema = tmp;
+
+        int keyColumnsCount = schemaDescriptor.keyColumns().length();
+
+        ColumnDescriptor[] keyCols = new ColumnDescriptor[keyColumnsCount];
+        int[] keyLogicalIndexes = new int[keyColumnsCount];
+        int counter = 0;
+
+        for (ColumnDescriptor colDesc : tmp) {
+            if (colDesc.key()) {
+                keyCols[counter] = colDesc;
+                keyLogicalIndexes[counter] = colDesc.logicalIndex();
+
+                counter++;
+            }
+        }
+
+        keyColumnsOrderedByPhysSchema = keyCols;
+
+        ImmutableBitSet keysBitSet = ImmutableBitSet.of(keyLogicalIndexes);
+        columnsMappingForKeyOnlyRow = 
Commons.trimmingMapping(keysBitSet.size(), keysBitSet);
+
         this.rowConverter = rowConverter;
     }
 
@@ -124,7 +156,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new 
Int2ObjectOpenHashMap<>();
 
         for (RowT row : rows) {
-            BinaryRowEx binaryRow = convertRow(row, ectx, false);
+            BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler());
 
             
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new 
ArrayList<>()).add(binaryRow);
         }
@@ -189,7 +221,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new 
Int2ObjectOpenHashMap<>();
 
         for (RowT row : rows) {
-            BinaryRowEx binaryRow = convertRow(row, ectx, false);
+            BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler());
 
             
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new 
ArrayList<>()).add(binaryRow);
         }
@@ -251,7 +283,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new 
Int2ObjectOpenHashMap<>();
 
         for (RowT row : rows) {
-            BinaryRowEx binaryRow = convertRow(row, ectx, true);
+            BinaryRowEx binaryRow = convertKeyOnlyRow(row, ectx.rowHandler());
 
             
keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> 
new ArrayList<>()).add(binaryRow);
         }
@@ -280,41 +312,39 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         return CompletableFuture.allOf(futures);
     }
 
-    private <RowT> BinaryRowEx convertRow(RowT row, ExecutionContext<RowT> 
ectx, boolean keyOnly) {
-        RowHandler<RowT> hnd = ectx.rowHandler();
+    private <RowT> BinaryRowEx convertRow(RowT row, RowHandler<RowT> hnd) {
+        RowAssembler rowAssembler = new RowAssembler(schemaDescriptor);
 
         for (ColumnDescriptor colDesc : columnsOrderedByPhysSchema) {
-            if (keyOnly && !colDesc.key()) {
-                continue;
-            }
-
-            Object value = hnd.get(colDesc.logicalIndex(), row);
-
-            // TODO Remove this check when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
-            assert value != DEFAULT_VALUE_PLACEHOLDER;
+            Object val = hnd.get(colDesc.logicalIndex(), row);
 
-            if (value == null) {
-                break;
-            }
+            appendValue(rowAssembler, colDesc, val);
         }
 
-        RowAssembler rowAssembler = keyOnly ? 
RowAssembler.keyAssembler(schemaDescriptor) : new 
RowAssembler(schemaDescriptor);
-
-        for (ColumnDescriptor colDesc : columnsOrderedByPhysSchema) {
-            if (keyOnly && !colDesc.key()) {
-                continue;
-            }
+        return new Row(schemaDescriptor, rowAssembler.build());
+    }
 
-            Object val = hnd.get(colDesc.logicalIndex(), row);
+    private <RowT> BinaryRowEx convertKeyOnlyRow(RowT row, RowHandler<RowT> 
hnd) {
+        RowAssembler rowAssembler = 
RowAssembler.keyAssembler(schemaDescriptor);
 
-            val = TypeUtils.fromInternal(val, 
NativeTypeSpec.toClass(colDesc.physicalType().spec(), colDesc.nullable()));
+        for (ColumnDescriptor colDesc : keyColumnsOrderedByPhysSchema) {
+            Object val = 
hnd.get(columnsMappingForKeyOnlyRow.getTarget(colDesc.logicalIndex()), row);
 
-            RowAssembler.writeValue(rowAssembler, colDesc.physicalType(), val);
+            appendValue(rowAssembler, colDesc, val);
         }
 
         return new Row(schemaDescriptor, rowAssembler.build());
     }
 
+    private static void appendValue(RowAssembler rowAssembler, 
ColumnDescriptor colDesc, Object val) {
+        // TODO Remove this check when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+        assert val != DEFAULT_VALUE_PLACEHOLDER;
+
+        val = TypeUtils.fromInternal(val, 
NativeTypeSpec.toClass(colDesc.physicalType().spec(), colDesc.nullable()));
+
+        RowAssembler.writeValue(rowAssembler, colDesc.physicalType(), val);
+    }
+
     private static <RowT> CompletableFuture<List<RowT>> handleInsertResults(
             RowHandler<RowT> handler,
             CompletableFuture<List<RowT>>[] futs
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
index 05b32c8013..702743d2a1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.TableModify.Operation;
 import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
@@ -44,10 +45,13 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.util.IgniteIntList;
 
 /**
  * TableModifyConverterRule.
@@ -71,8 +75,29 @@ public class TableModifyConverterRule extends 
AbstractIgniteConverterRule<Logica
         IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
         assert igniteTable != null;
 
+        IgniteDistribution distribution = igniteTable.distribution();
+
+        if (rel.getOperation() == Operation.DELETE) {
+            // To perform the delete, we need a row with key fields only.
+            // Input distribution contains the indexes of the key columns 
according to the schema (i.e. for the full row).
+            // Here we adjusting distribution keys so that a row containing 
only the key fields can be read.
+            IgniteIntList keyFields = new IgniteIntList();
+
+            for (int i = 0; i < igniteTable.descriptor().columnsCount(); i++) {
+                ColumnDescriptor column = 
igniteTable.descriptor().columnDescriptor(i);
+
+                if (column.key()) {
+                    keyFields.add(column.logicalIndex());
+                }
+            }
+
+            ImmutableBitSet keysBitSet = ImmutableBitSet.of(keyFields.array());
+
+            distribution = 
distribution.apply(Commons.trimmingMapping(keysBitSet.size(), keysBitSet));
+        }
+
         RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
-                .replace(igniteTable.distribution())
+                .replace(distribution)
                 .replace(RelCollations.EMPTY);
 
         RelNode input = convert(rel.getInput(), traits);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2d0e283046..d38427b2a3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -669,6 +669,16 @@ public class TestBuilders {
             return self();
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public ChildT addKeyColumn(String name, NativeType type) {
+            columns.add(new ColumnDescriptorImpl(
+                    name, true, false, columns.size(), columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
+            ));
+
+            return self();
+        }
+
         /** {@inheritDoc} */
         @Override
         public ChildT addColumn(String name, NativeType type) {
@@ -780,6 +790,9 @@ public class TestBuilders {
         /** Sets the distribution of the table. */
         ChildT distribution(IgniteDistribution distribution);
 
+        /** Adds a key column to the table. */
+        ChildT addKeyColumn(String name, NativeType type);
+
         /** Adds a column to the table. */
         ChildT addColumn(String name, NativeType type);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
index 6a1409f4fa..3023660850 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -146,6 +146,29 @@ public class DmlPlannerTest extends AbstractPlannerTest {
         );
     }
 
+    @ParameterizedTest
+    @MethodSource("distributionsForDelete")
+    public void testDelete(IgniteDistribution distribution) throws Exception {
+        IgniteTable test1 = TestBuilders.table()
+                .name("TEST1")
+                .addColumn("C1", NativeTypes.INT32)
+                .addKeyColumn("KEY1", NativeTypes.INT32)
+                .addColumn("C2", NativeTypes.INT32)
+                .addKeyColumn("KEY2", NativeTypes.INT32)
+                .distribution(distribution)
+                .build();
+
+        IgniteSchema schema = createSchema(test1);
+
+        // There should be no exchange between the modify node and the scan 
node.
+        assertPlan("DELETE FROM TEST1 WHERE KEY1 = 1 and KEY2 = 2", schema,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(e -> 
e.distribution().equals(IgniteDistributions.single())))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class)
+                                .and(input(isTableScan("TEST1")))))
+        );
+    }
+
     private static Stream<IgniteDistribution> distributions() {
         return Stream.of(
                 IgniteDistributions.single(),
@@ -154,6 +177,21 @@ public class DmlPlannerTest extends AbstractPlannerTest {
         );
     }
 
+    /**
+     * Creates a list of non-single distributions with keys corresponding to 
the indexes of the key columns of the table.
+     *
+     * @return Distributions to test DELETE operation.
+     */
+    private static Stream<IgniteDistribution> distributionsForDelete() {
+        return Stream.of(
+                IgniteDistributions.hash(List.of(1, 3)),
+                IgniteDistributions.affinity(1, 2, "0"),
+                IgniteDistributions.affinity(3, 2, "0"),
+                IgniteDistributions.affinity(List.of(1, 3), 2, "0"),
+                IgniteDistributions.affinity(List.of(3, 1), 2, "0")
+        );
+    }
+
     /**
      * Test for check basic dml operators when table doesn't exist.
      */

Reply via email to