This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bccdb5e4ab IGNITE-21401 DataStreamer data removal: improve 
RW_UPSERT_ALL to support delete (#3148)
bccdb5e4ab is described below

commit bccdb5e4abd6204b9cb52740d0c2e61c546dab2d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Feb 5 13:08:36 2024 +0200

    IGNITE-21401 DataStreamer data removal: improve RW_UPSERT_ALL to support 
delete (#3148)
    
    Add `ReadWriteMultiRowReplicaRequest#deleted` to indicate delete 
operations. Initially the idea was to use key-only rows to indicate delete, but 
it does not work for single-column tables.
---
 .../ignite/client/fakes/FakeInternalTable.java     |  2 +-
 .../ignite/internal/table/ItInternalTableTest.java | 37 ++++++++++++++++
 .../ignite/internal/table/InternalTable.java       |  7 +--
 .../internal/table/KeyValueBinaryViewImpl.java     |  2 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  2 +-
 .../internal/table/RecordBinaryViewImpl.java       |  2 +-
 .../ignite/internal/table/RecordViewImpl.java      |  2 +-
 .../request/ReadWriteMultiRowReplicaRequest.java   | 10 +++++
 .../replicator/PartitionReplicaListener.java       | 50 ++++++++++++++++++----
 .../distributed/storage/InternalTableImpl.java     | 32 ++++++++++++--
 10 files changed, 125 insertions(+), 21 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 39731f6565..d296ac70ff 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -185,7 +185,7 @@ public class FakeInternalTable implements InternalTable {
     }
 
     @Override
-    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition) {
+    public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, 
@Nullable BitSet deleted, int partition) {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index 5eb2f5b965..b6cb3a5a38 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -32,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
@@ -472,6 +474,41 @@ public class ItInternalTableTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    public void updateAllWithDeleteTest() {
+        InternalTable internalTable = ((TableViewInternal) 
table).internalTable();
+
+        RecordView<Tuple> view = table.recordView();
+        view.upsert(null, Tuple.create().set("key", 1L).set("valInt", 
1).set("valStr", "val1"));
+        view.upsert(null, Tuple.create().set("key", 3L).set("valInt", 
3).set("valStr", "val3"));
+
+        // Update, insert, delete.
+        List<BinaryRowEx> rows = List.of(
+                createKeyValueRow(1, 11, "val11"),
+                createKeyValueRow(3, 2, "val2"),
+                createKeyRow(5)
+        );
+
+        int partitionId = internalTable.partitionId(rows.get(0));
+
+        for (int i = 0; i < rows.size(); i++) {
+            assertEquals(partitionId, internalTable.partitionId(rows.get(i)), 
"Unexpected partition for row " + i);
+        }
+
+        BitSet deleted = new BitSet(3);
+        deleted.set(2);
+        internalTable.updateAll(rows, deleted, partitionId).join();
+
+        var row1 = view.get(null, Tuple.create().set("key", 1L));
+        assertEquals(11, row1.intValue("valInt"));
+
+        var row2 = view.get(null, Tuple.create().set("key", 3L));
+        assertEquals(2, row2.intValue("valInt"));
+
+        var row3 = view.get(null, Tuple.create().set("key", 5L));
+        assertNull(row3);
+    }
+
     private ArrayList<BinaryRowEx> 
populateEvenKeysAndPrepareEntriesToLookup(boolean keyOnly) {
         KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index ad5bc18e0a..cfd274c129 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -156,13 +156,14 @@ public interface InternalTable extends ManuallyCloseable {
     CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable 
InternalTransaction tx);
 
     /**
-     * Asynchronously inserts records into a table, if they do not exist, or 
replaces the existing ones.
+     * Asynchronously updates records in the table (insert, update, delete).
      *
-     * @param rows Rows to insert into the table.
+     * @param rows Rows to update.
+     * @param deleted Bit set indicating deleted rows (one bit per item in 
{@param rows}). When null, no rows are deleted.
      * @param partition Partition that the rows belong to.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition);
+    CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, @Nullable 
BitSet deleted, int partition);
 
     /**
      * Asynchronously inserts a row into the table or replaces if exists and 
return replaced previous row.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 60ff561d8e..8b089518d0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -545,7 +545,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         var partitioner = new 
KeyValueTupleStreamerPartitionAwarenessProvider(rowConverter.registry(), 
tbl.partitions());
         StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender = 
(partitionId, items) -> {
             return withSchemaSync(null, (schemaVersion) -> {
-                return this.tbl.upsertAll(marshalPairs(items, schemaVersion), 
partitionId);
+                return this.tbl.updateAll(marshalPairs(items, schemaVersion), 
null, partitionId);
             });
         };
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index d193078f71..f6c423a7c9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -690,7 +690,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
 
         StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId, 
items) -> {
             return withSchemaSync(null, (schemaVersion) -> {
-                return this.tbl.upsertAll(marshalPairs(items, schemaVersion), 
partitionId);
+                return this.tbl.updateAll(marshalPairs(items, schemaVersion), 
null, partitionId);
             });
         };
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 422c99a46f..b876838ac5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -432,7 +432,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
 
         var partitioner = new 
TupleStreamerPartitionAwarenessProvider(rowConverter.registry(), 
tbl.partitions());
         StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items) 
-> withSchemaSync(null,
-                (schemaVersion) -> this.tbl.upsertAll(mapToBinary(items, 
schemaVersion, false), partitionId));
+                (schemaVersion) -> this.tbl.updateAll(mapToBinary(items, 
schemaVersion, false), null, partitionId));
 
         return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 92ba1f71df..c4e87750e1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -533,7 +533,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
 
         StreamerBatchSender<R, Integer> batchSender = (partitionId, items) -> {
             return withSchemaSync(null, (schemaVersion) -> {
-                return this.tbl.upsertAll(marshal(items, schemaVersion), 
partitionId);
+                return this.tbl.updateAll(marshal(items, schemaVersion), null, 
partitionId);
             });
         };
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index f2476ca46b..8eb899ead6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.util.BitSet;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Read-write multi-row replica request.
@@ -31,4 +33,12 @@ public interface ReadWriteMultiRowReplicaRequest extends 
MultipleRowReplicaReque
      * @return {@code True} to disable the delayed ack optimization.
      */
     boolean skipDelayedAck();
+
+    /**
+     * Deleted flags (one for every tuple in {@link #binaryTuples()}.
+     *
+     * @return A bit for every tuple in {@link #binaryTuples()} indicating a 
delete operation.
+     */
+    @Nullable
+    BitSet deleted();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 728268da12..198915d783 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -51,6 +51,7 @@ import static 
org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -2282,11 +2283,23 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] 
rowIdFuts = new CompletableFuture[searchRows.size()];
 
                 Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+                BitSet deleted = request.deleted();
 
                 for (int i = 0; i < searchRows.size(); i++) {
                     BinaryRow searchRow = searchRows.get(i);
 
-                    rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId, 
(rowId, row, lastCommitTime) -> {
+                    boolean isDelete = deleted != null && deleted.get(i);
+
+                    BinaryTuple pk = isDelete
+                            ? resolvePk(searchRow.tupleSlice())
+                            : extractPk(searchRow);
+
+                    rowIdFuts[i] = resolveRowByPk(pk, txId, (rowId, row, 
lastCommitTime) -> {
+                        if (isDelete && rowId == null) {
+                            // Does not exist, nothing to delete.
+                            return nullCompletedFuture();
+                        }
+
                         boolean insert = rowId == null;
 
                         RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
@@ -2295,6 +2308,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                         }
 
+                        if (isDelete) {
+                            assert row != null;
+
+                            return takeLocksForDelete(row, rowId0, txId)
+                                    .thenApply(id -> new IgniteBiTuple<>(id, 
null));
+                        }
+
                         return insert
                                 ? takeLocksForInsert(searchRow, rowId0, txId)
                                 : takeLocksForUpdate(searchRow, rowId0, txId);
@@ -2306,13 +2326,21 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     List<RowId> rows = new ArrayList<>();
 
                     for (int i = 0; i < searchRows.size(); i++) {
-                        RowId lockedRow = rowIdFuts[i].join().get1();
+                        IgniteBiTuple<RowId, Collection<Lock>> locks = 
rowIdFuts[i].join();
+                        if (locks == null) {
+                            continue;
+                        }
+
+                        RowId lockedRow = locks.get1();
+
+                        TimedBinaryRowMessageBuilder 
timedBinaryRowMessageBuilder = MSG_FACTORY.timedBinaryRowMessage()
+                                
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())));
+
+                        if (deleted == null || !deleted.get(i)) {
+                            
timedBinaryRowMessageBuilder.binaryRowMessage(binaryRowMessage(searchRows.get(i)));
+                        }
 
-                        rowsToUpdate.put(lockedRow.uuid(),
-                                MSG_FACTORY.timedBinaryRowMessage()
-                                        
.binaryRowMessage(binaryRowMessage(searchRows.get(i)))
-                                        
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())))
-                                        .build());
+                        rowsToUpdate.put(lockedRow.uuid(), 
timedBinaryRowMessageBuilder.build());
 
                         rows.add(lockedRow);
                     }
@@ -2333,8 +2361,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             .thenApply(res -> {
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> rowIdFut : rowIdFuts) {
-                                    rowIdFut.join().get2()
-                                            .forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
+                                    IgniteBiTuple<RowId, Collection<Lock>> 
futRes = rowIdFut.join();
+                                    Collection<Lock> locks = futRes == null ? 
null : futRes.get2();
+
+                                    if (locks != null) {
+                                        locks.forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
+                                    }
                                 }
 
                                 return new ReplicaResult(null, res);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 01e463165c..3e91a11422 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1008,14 +1008,14 @@ public class InternalTableImpl implements InternalTable 
{
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition) {
+    public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, 
@Nullable BitSet deleted, int partition) {
         InternalTransaction tx = txManager.begin(observableTimestampTracker);
         TablePartitionId partGroupId = new TablePartitionId(tableId, 
partition);
 
         CompletableFuture<Void> fut = enlistWithRetry(
                 tx,
                 partition,
-                enlistmentConsistencyToken -> upsertAllInternal(rows, tx, 
partGroupId, enlistmentConsistencyToken, true),
+                enlistmentConsistencyToken -> upsertAllInternal(rows, deleted, 
tx, partGroupId, enlistmentConsistencyToken, true),
                 true,
                 null,
                 true // Allow auto retries for data streamer.
@@ -1077,7 +1077,14 @@ public class InternalTableImpl implements InternalTable {
                 rows,
                 tx,
                 (keyRows, txo, groupId, enlistmentConsistencyToken, full) ->
-                        
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows, txo, 
groupId, enlistmentConsistencyToken, full),
+                        readWriteMultiRowReplicaRequest(
+                                RequestType.RW_INSERT_ALL,
+                                keyRows,
+                                null,
+                                txo,
+                                groupId,
+                                enlistmentConsistencyToken,
+                                full),
                 
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
                     for (BinaryRow row : res) {
@@ -1096,6 +1103,7 @@ public class InternalTableImpl implements InternalTable {
     private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(
             RequestType requestType,
             Collection<? extends BinaryRow> rows,
+            @Nullable BitSet deleted,
             InternalTransaction tx,
             ReplicationGroupId groupId,
             Long enlistmentConsistencyToken,
@@ -1108,6 +1116,7 @@ public class InternalTableImpl implements InternalTable {
                 
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .binaryTuples(serializeBinaryTuples(rows))
+                .deleted(deleted)
                 .transactionId(tx.id())
                 .enlistmentConsistencyToken(enlistmentConsistencyToken)
                 .requestType(requestType)
@@ -1295,6 +1304,7 @@ public class InternalTableImpl implements InternalTable {
                         readWriteMultiRowReplicaRequest(
                                 RequestType.RW_DELETE_EXACT_ALL,
                                 keyRows0,
+                                null,
                                 txo,
                                 groupId,
                                 enlistmentConsistencyToken,
@@ -2129,6 +2139,20 @@ public class InternalTableImpl implements InternalTable {
     ) {
         assert serializeTablePartitionId(txo.commitPartition()) != null;
 
-        return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, 
keyRows0, txo, groupId, enlistmentConsistencyToken, full);
+        return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, 
keyRows0, null, txo, groupId, enlistmentConsistencyToken, full);
+    }
+
+    private ReplicaRequest upsertAllInternal(
+            Collection<? extends BinaryRow> keyRows0,
+            @Nullable BitSet deleted,
+            InternalTransaction txo,
+            ReplicationGroupId groupId,
+            Long enlistmentConsistencyToken,
+            boolean full
+    ) {
+        assert serializeTablePartitionId(txo.commitPartition()) != null;
+
+        return readWriteMultiRowReplicaRequest(
+                RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId, 
enlistmentConsistencyToken, full);
     }
 }

Reply via email to