This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bccdb5e4ab IGNITE-21401 DataStreamer data removal: improve
RW_UPSERT_ALL to support delete (#3148)
bccdb5e4ab is described below
commit bccdb5e4abd6204b9cb52740d0c2e61c546dab2d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Feb 5 13:08:36 2024 +0200
IGNITE-21401 DataStreamer data removal: improve RW_UPSERT_ALL to support
delete (#3148)
Add `ReadWriteMultiRowReplicaRequest#deleted` to indicate delete
operations. Initially the idea was to use key-only rows to indicate delete, but
it does not work for single-column tables.
---
.../ignite/client/fakes/FakeInternalTable.java | 2 +-
.../ignite/internal/table/ItInternalTableTest.java | 37 ++++++++++++++++
.../ignite/internal/table/InternalTable.java | 7 +--
.../internal/table/KeyValueBinaryViewImpl.java | 2 +-
.../ignite/internal/table/KeyValueViewImpl.java | 2 +-
.../internal/table/RecordBinaryViewImpl.java | 2 +-
.../ignite/internal/table/RecordViewImpl.java | 2 +-
.../request/ReadWriteMultiRowReplicaRequest.java | 10 +++++
.../replicator/PartitionReplicaListener.java | 50 ++++++++++++++++++----
.../distributed/storage/InternalTableImpl.java | 32 ++++++++++++--
10 files changed, 125 insertions(+), 21 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 39731f6565..d296ac70ff 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -185,7 +185,7 @@ public class FakeInternalTable implements InternalTable {
}
@Override
- public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition) {
+ public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows,
@Nullable BitSet deleted, int partition) {
throw new UnsupportedOperationException();
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index 5eb2f5b965..b6cb3a5a38 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -32,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
@@ -472,6 +474,41 @@ public class ItInternalTableTest extends
BaseIgniteAbstractTest {
}
}
+ @Test
+ public void updateAllWithDeleteTest() {
+ InternalTable internalTable = ((TableViewInternal)
table).internalTable();
+
+ RecordView<Tuple> view = table.recordView();
+ view.upsert(null, Tuple.create().set("key", 1L).set("valInt",
1).set("valStr", "val1"));
+ view.upsert(null, Tuple.create().set("key", 3L).set("valInt",
3).set("valStr", "val3"));
+
+ // Update, insert, delete.
+ List<BinaryRowEx> rows = List.of(
+ createKeyValueRow(1, 11, "val11"),
+ createKeyValueRow(3, 2, "val2"),
+ createKeyRow(5)
+ );
+
+ int partitionId = internalTable.partitionId(rows.get(0));
+
+ for (int i = 0; i < rows.size(); i++) {
+ assertEquals(partitionId, internalTable.partitionId(rows.get(i)),
"Unexpected partition for row " + i);
+ }
+
+ BitSet deleted = new BitSet(3);
+ deleted.set(2);
+ internalTable.updateAll(rows, deleted, partitionId).join();
+
+ var row1 = view.get(null, Tuple.create().set("key", 1L));
+ assertEquals(11, row1.intValue("valInt"));
+
+ var row2 = view.get(null, Tuple.create().set("key", 3L));
+ assertEquals(2, row2.intValue("valInt"));
+
+ var row3 = view.get(null, Tuple.create().set("key", 5L));
+ assertNull(row3);
+ }
+
private ArrayList<BinaryRowEx>
populateEvenKeysAndPrepareEntriesToLookup(boolean keyOnly) {
KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index ad5bc18e0a..cfd274c129 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -156,13 +156,14 @@ public interface InternalTable extends ManuallyCloseable {
CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable
InternalTransaction tx);
/**
- * Asynchronously inserts records into a table, if they do not exist, or
replaces the existing ones.
+ * Asynchronously updates records in the table (insert, update, delete).
*
- * @param rows Rows to insert into the table.
+ * @param rows Rows to update.
+ * @param deleted Bit set indicating deleted rows (one bit per item in
{@param rows}). When null, no rows are deleted.
* @param partition Partition that the rows belong to.
* @return Future representing pending completion of the operation.
*/
- CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition);
+ CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, @Nullable
BitSet deleted, int partition);
/**
* Asynchronously inserts a row into the table or replaces if exists and
return replaced previous row.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 60ff561d8e..8b089518d0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -545,7 +545,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
var partitioner = new
KeyValueTupleStreamerPartitionAwarenessProvider(rowConverter.registry(),
tbl.partitions());
StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender =
(partitionId, items) -> {
return withSchemaSync(null, (schemaVersion) -> {
- return this.tbl.upsertAll(marshalPairs(items, schemaVersion),
partitionId);
+ return this.tbl.updateAll(marshalPairs(items, schemaVersion),
null, partitionId);
});
};
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index d193078f71..f6c423a7c9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -690,7 +690,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId,
items) -> {
return withSchemaSync(null, (schemaVersion) -> {
- return this.tbl.upsertAll(marshalPairs(items, schemaVersion),
partitionId);
+ return this.tbl.updateAll(marshalPairs(items, schemaVersion),
null, partitionId);
});
};
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 422c99a46f..b876838ac5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -432,7 +432,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
var partitioner = new
TupleStreamerPartitionAwarenessProvider(rowConverter.registry(),
tbl.partitions());
StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items)
-> withSchemaSync(null,
- (schemaVersion) -> this.tbl.upsertAll(mapToBinary(items,
schemaVersion, false), partitionId));
+ (schemaVersion) -> this.tbl.updateAll(mapToBinary(items,
schemaVersion, false), null, partitionId));
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 92ba1f71df..c4e87750e1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -533,7 +533,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
StreamerBatchSender<R, Integer> batchSender = (partitionId, items) -> {
return withSchemaSync(null, (schemaVersion) -> {
- return this.tbl.upsertAll(marshal(items, schemaVersion),
partitionId);
+ return this.tbl.updateAll(marshal(items, schemaVersion), null,
partitionId);
});
};
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index f2476ca46b..8eb899ead6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.util.BitSet;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.jetbrains.annotations.Nullable;
/**
* Read-write multi-row replica request.
@@ -31,4 +33,12 @@ public interface ReadWriteMultiRowReplicaRequest extends
MultipleRowReplicaReque
* @return {@code True} to disable the delayed ack optimization.
*/
boolean skipDelayedAck();
+
+ /**
+ * Deleted flags (one for every tuple in {@link #binaryTuples()}.
+ *
+ * @return A bit for every tuple in {@link #binaryTuples()} indicating a
delete operation.
+ */
+ @Nullable
+ BitSet deleted();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 728268da12..198915d783 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -51,6 +51,7 @@ import static
org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
@@ -2282,11 +2283,23 @@ public class PartitionReplicaListener implements
ReplicaListener {
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[]
rowIdFuts = new CompletableFuture[searchRows.size()];
Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+ BitSet deleted = request.deleted();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow searchRow = searchRows.get(i);
- rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId,
(rowId, row, lastCommitTime) -> {
+ boolean isDelete = deleted != null && deleted.get(i);
+
+ BinaryTuple pk = isDelete
+ ? resolvePk(searchRow.tupleSlice())
+ : extractPk(searchRow);
+
+ rowIdFuts[i] = resolveRowByPk(pk, txId, (rowId, row,
lastCommitTime) -> {
+ if (isDelete && rowId == null) {
+ // Does not exist, nothing to delete.
+ return nullCompletedFuture();
+ }
+
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
@@ -2295,6 +2308,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
lastCommitTimes.put(rowId.uuid(), lastCommitTime);
}
+ if (isDelete) {
+ assert row != null;
+
+ return takeLocksForDelete(row, rowId0, txId)
+ .thenApply(id -> new IgniteBiTuple<>(id,
null));
+ }
+
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
@@ -2306,13 +2326,21 @@ public class PartitionReplicaListener implements
ReplicaListener {
List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
- RowId lockedRow = rowIdFuts[i].join().get1();
+ IgniteBiTuple<RowId, Collection<Lock>> locks =
rowIdFuts[i].join();
+ if (locks == null) {
+ continue;
+ }
+
+ RowId lockedRow = locks.get1();
+
+ TimedBinaryRowMessageBuilder
timedBinaryRowMessageBuilder = MSG_FACTORY.timedBinaryRowMessage()
+
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())));
+
+ if (deleted == null || !deleted.get(i)) {
+
timedBinaryRowMessageBuilder.binaryRowMessage(binaryRowMessage(searchRows.get(i)));
+ }
- rowsToUpdate.put(lockedRow.uuid(),
- MSG_FACTORY.timedBinaryRowMessage()
-
.binaryRowMessage(binaryRowMessage(searchRows.get(i)))
-
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())))
- .build());
+ rowsToUpdate.put(lockedRow.uuid(),
timedBinaryRowMessageBuilder.build());
rows.add(lockedRow);
}
@@ -2333,8 +2361,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenApply(res -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> rowIdFut : rowIdFuts) {
- rowIdFut.join().get2()
- .forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
+ IgniteBiTuple<RowId, Collection<Lock>>
futRes = rowIdFut.join();
+ Collection<Lock> locks = futRes == null ?
null : futRes.get2();
+
+ if (locks != null) {
+ locks.forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
+ }
}
return new ReplicaResult(null, res);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 01e463165c..3e91a11422 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1008,14 +1008,14 @@ public class InternalTableImpl implements InternalTable
{
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition) {
+ public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows,
@Nullable BitSet deleted, int partition) {
InternalTransaction tx = txManager.begin(observableTimestampTracker);
TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
CompletableFuture<Void> fut = enlistWithRetry(
tx,
partition,
- enlistmentConsistencyToken -> upsertAllInternal(rows, tx,
partGroupId, enlistmentConsistencyToken, true),
+ enlistmentConsistencyToken -> upsertAllInternal(rows, deleted,
tx, partGroupId, enlistmentConsistencyToken, true),
true,
null,
true // Allow auto retries for data streamer.
@@ -1077,7 +1077,14 @@ public class InternalTableImpl implements InternalTable {
rows,
tx,
(keyRows, txo, groupId, enlistmentConsistencyToken, full) ->
-
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows, txo,
groupId, enlistmentConsistencyToken, full),
+ readWriteMultiRowReplicaRequest(
+ RequestType.RW_INSERT_ALL,
+ keyRows,
+ null,
+ txo,
+ groupId,
+ enlistmentConsistencyToken,
+ full),
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1096,6 +1103,7 @@ public class InternalTableImpl implements InternalTable {
private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(
RequestType requestType,
Collection<? extends BinaryRow> rows,
+ @Nullable BitSet deleted,
InternalTransaction tx,
ReplicationGroupId groupId,
Long enlistmentConsistencyToken,
@@ -1108,6 +1116,7 @@ public class InternalTableImpl implements InternalTable {
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
.schemaVersion(rows.iterator().next().schemaVersion())
.binaryTuples(serializeBinaryTuples(rows))
+ .deleted(deleted)
.transactionId(tx.id())
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(requestType)
@@ -1295,6 +1304,7 @@ public class InternalTableImpl implements InternalTable {
readWriteMultiRowReplicaRequest(
RequestType.RW_DELETE_EXACT_ALL,
keyRows0,
+ null,
txo,
groupId,
enlistmentConsistencyToken,
@@ -2129,6 +2139,20 @@ public class InternalTableImpl implements InternalTable {
) {
assert serializeTablePartitionId(txo.commitPartition()) != null;
- return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL,
keyRows0, txo, groupId, enlistmentConsistencyToken, full);
+ return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL,
keyRows0, null, txo, groupId, enlistmentConsistencyToken, full);
+ }
+
+ private ReplicaRequest upsertAllInternal(
+ Collection<? extends BinaryRow> keyRows0,
+ @Nullable BitSet deleted,
+ InternalTransaction txo,
+ ReplicationGroupId groupId,
+ Long enlistmentConsistencyToken,
+ boolean full
+ ) {
+ assert serializeTablePartitionId(txo.commitPartition()) != null;
+
+ return readWriteMultiRowReplicaRequest(
+ RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId,
enlistmentConsistencyToken, full);
}
}