This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 48b6ebb981 IGNITE-22678 Java thin: fix ClientPartitionManager cache
invalidation (#4175)
48b6ebb981 is described below
commit 48b6ebb981959460abf23ae8771d1a00101868af
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Aug 5 16:25:33 2024 +0300
IGNITE-22678 Java thin: fix ClientPartitionManager cache invalidation
(#4175)
* Remove hardcoded one-minute cache timeout
* Use `ReliableChannel.partitionAssignmentTimestamp` to invalidate the
cache (relies on `PARTITION_ASSIGNMENT_FLAG` response flag)
---
.../ignite/client/handler/FakePlacementDriver.java | 4 ++
.../client/table/ClientPartitionManager.java | 27 +++++---
.../apache/ignite/client/AbstractClientTest.java | 6 +-
.../ignite/client/ClientPartitionManagerTest.java | 80 ++++++++++++++++++++++
.../ignite/client/PartitionAwarenessTest.java | 2 +-
.../org/apache/ignite/client/ReconnectTest.java | 7 +-
.../java/org/apache/ignite/client/TestServer.java | 18 +++--
.../client/benchmarks/ClientPutGetBenchmark.java | 3 +-
.../org/apache/ignite/client/fakes/FakeIgnite.java | 9 ++-
.../ignite/client/fakes/FakeIgniteTables.java | 8 ++-
.../ignite/client/fakes/FakeInternalTable.java | 22 +++++-
11 files changed, 150 insertions(+), 36 deletions(-)
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 62c0fc17e8..54da3e4648 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -113,6 +113,10 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
}
+ public List<ReplicaMeta> primaryReplicas() {
+ return primaryReplicas;
+ }
+
private static ReplicaMeta getReplicaMeta(String leaseholder, long
leaseStartTime) {
//noinspection serial
return new ReplicaMeta() {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
index 1e07c62174..7efa4999fe 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -21,8 +21,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.client.TcpIgniteClient.unpackClusterNode;
import static
org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -42,14 +40,14 @@ import org.jetbrains.annotations.Nullable;
/**
* Client partition manager implementation.
*/
-public class ClientPartitionManager implements PartitionManager {
+class ClientPartitionManager implements PartitionManager {
private final ClientTable tbl;
private final Lock lock = new ReentrantLock();
private final Map<Partition, ClusterNode> cache = new HashMap<>();
- private @Nullable Instant aliveUntil;
+ private long assignmentChangeTimestamp;
ClientPartitionManager(ClientTable clientTable) {
this.tbl = clientTable;
@@ -80,6 +78,8 @@ public class ClientPartitionManager implements
PartitionManager {
return completedFuture(cache);
}
+ var currentTs = tbl.channel().partitionAssignmentTimestamp();
+
return tbl.channel().serviceAsync(ClientOp.PRIMARY_REPLICAS_GET,
w -> w.out().packInt(tbl.tableId()),
r -> {
@@ -95,7 +95,7 @@ public class ClientPartitionManager implements
PartitionManager {
return res;
})
- .thenApply(this::updateCache);
+ .thenApply(map -> updateCache(map, currentTs));
}
@Override
@@ -115,12 +115,14 @@ public class ClientPartitionManager implements
PartitionManager {
private @Nullable ClusterNode getClusterNode(Partition partition) {
lock.lock();
+
try {
- if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+ //noinspection resource
+ if (tbl.channel().partitionAssignmentTimestamp() >
assignmentChangeTimestamp) {
cache.clear();
- aliveUntil = null;
return null;
}
+
return cache.get(partition);
} finally {
lock.unlock();
@@ -129,23 +131,26 @@ public class ClientPartitionManager implements
PartitionManager {
private @Nullable Map<Partition, ClusterNode> lookupCache() {
lock.lock();
+
try {
- if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+ //noinspection resource
+ if (tbl.channel().partitionAssignmentTimestamp() >
assignmentChangeTimestamp) {
cache.clear();
- aliveUntil = null;
return null;
}
+
return Map.copyOf(cache);
} finally {
lock.unlock();
}
}
- private Map<Partition, ClusterNode> updateCache(Map<Partition,
ClusterNode> map) {
+ private Map<Partition, ClusterNode> updateCache(Map<Partition,
ClusterNode> map, long timestamp) {
lock.lock();
+
try {
cache.putAll(map);
- aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES);
+ assignmentChangeTimestamp = timestamp;
return map;
} finally {
lock.unlock();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 3c938a6b87..3de754b39e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractClientTest extends
BaseIgniteAbstractTest {
protected static TestServer testServer;
- protected static Ignite server;
+ protected static FakeIgnite server;
protected static IgniteClient client;
@@ -116,7 +116,7 @@ public abstract class AbstractClientTest extends
BaseIgniteAbstractTest {
*/
public static TestServer startServer(
long idleTimeout,
- Ignite ignite
+ FakeIgnite ignite
) {
return startServer(idleTimeout, ignite, null);
}
@@ -131,7 +131,7 @@ public abstract class AbstractClientTest extends
BaseIgniteAbstractTest {
*/
public static TestServer startServer(
long idleTimeout,
- Ignite ignite,
+ FakeIgnite ignite,
String nodeName
) {
return new TestServer(idleTimeout, ignite, null, null, nodeName,
clusterId, null, null);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
new file mode 100644
index 0000000000..8cae25386e
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.client.handler.FakePlacementDriver;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for client partition manager.
+ */
+public class ClientPartitionManagerTest extends AbstractClientTest {
+ private static final String TABLE_NAME = "tbl1";
+
+ private static int tableId;
+
+ @BeforeEach
+ public void setUp() {
+ Table table = ((FakeIgniteTables)
server.tables()).createTable(TABLE_NAME);
+
+ tableId = ((TableViewInternal) table).tableId();
+ }
+
+ @Test
+ public void testPrimaryReplicasCacheInvalidation() {
+ Table table = client.tables().table(TABLE_NAME);
+ PartitionManager partMgr = table.partitionManager();
+ HashPartition part0 = new HashPartition(0);
+ HashPartition part2 = new HashPartition(2);
+
+ // Before update.
+ Map<Partition, ClusterNode> map =
partMgr.primaryReplicasAsync().join();
+ assertEquals(4, map.size());
+ assertEquals("s", map.get(part0).name());
+ assertEquals("s", partMgr.primaryReplicaAsync(part2).join().name());
+
+ // Update.
+ updateServerReplicas(List.of("foo", "bar", "baz", "qux"));
+ client.tables().tables(); // Perform a request to trigger cache
invalidation.
+
+ // After update.
+ Map<Partition, ClusterNode> map2 =
partMgr.primaryReplicasAsync().join();
+ assertEquals(4, map2.size());
+ assertEquals("foo", map2.get(part0).name());
+ assertEquals("baz", partMgr.primaryReplicaAsync(part2).join().name());
+ }
+
+ private static void updateServerReplicas(List<String> replicas) {
+ FakePlacementDriver placementDriver = testServer.placementDriver();
+ long leaseStartTime = new HybridClockImpl().nowLong();
+ placementDriver.setReplicas(replicas, tableId, leaseStartTime);
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 5a1120fe32..84a9f4daf7 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -81,7 +81,7 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
private static TestServer testServer2;
- private static Ignite server2;
+ private static FakeIgnite server2;
private static IgniteClient client2;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index a11bd877c2..6ff0c901a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -23,7 +23,6 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
@@ -109,7 +108,7 @@ public class ReconnectTest extends BaseIgniteAbstractTest {
server2.close();
waitForConnections(client, 1);
- Ignite ignite = new FakeIgnite();
+ FakeIgnite ignite = new FakeIgnite();
server2 = new TestServer(0, ignite, null, null, "node3",
AbstractClientTest.clusterId, null, 10903);
if (reconnectEnabled) {
@@ -145,9 +144,9 @@ public class ReconnectTest extends BaseIgniteAbstractTest {
}
private void startTwoServers() {
- Ignite ignite = new FakeIgnite();
+ FakeIgnite ignite = new FakeIgnite();
server = new TestServer(0, ignite, null, null, "node1",
AbstractClientTest.clusterId, null, 10901);
- Ignite ignite1 = new FakeIgnite();
+ FakeIgnite ignite1 = new FakeIgnite();
server2 = new TestServer(0, ignite1, null, null, "node2",
AbstractClientTest.clusterId, null, 10902);
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index f545f1e71c..015621ebd7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -92,9 +92,7 @@ public class TestServer implements AutoCloseable {
private final AuthenticationManager authenticationManager;
- private final Ignite ignite;
-
- private final FakePlacementDriver placementDriver = new
FakePlacementDriver(FakeInternalTable.PARTITIONS);
+ private final FakeIgnite ignite;
/**
* Constructor.
@@ -104,7 +102,7 @@ public class TestServer implements AutoCloseable {
*/
public TestServer(
long idleTimeout,
- Ignite ignite
+ FakeIgnite ignite
) {
this(
idleTimeout,
@@ -123,7 +121,7 @@ public class TestServer implements AutoCloseable {
*/
public TestServer(
long idleTimeout,
- Ignite ignite,
+ FakeIgnite ignite,
@Nullable Function<Integer, Boolean> shouldDropConnection,
@Nullable Function<Integer, Integer> responseDelay,
@Nullable String nodeName,
@@ -153,7 +151,7 @@ public class TestServer implements AutoCloseable {
*/
public TestServer(
long idleTimeout,
- Ignite ignite,
+ FakeIgnite ignite,
@Nullable Function<Integer, Boolean> shouldDropConnection,
@Nullable Function<Integer, Integer> responseDelay,
@Nullable String nodeName,
@@ -234,10 +232,10 @@ public class TestServer implements AutoCloseable {
metrics,
authenticationManager,
clock,
- placementDriver,
+ ignite.placementDriver(),
clientConnectorConfiguration)
: new ClientHandlerModule(
- ((FakeIgnite) ignite).queryEngine(),
+ ignite.queryEngine(),
(IgniteTablesInternal) ignite.tables(),
(IgniteTransactionsImpl) ignite.transactions(),
(IgniteComputeInternal) ignite.compute(),
@@ -250,7 +248,7 @@ public class TestServer implements AutoCloseable {
new TestClockService(clock),
new AlwaysSyncedSchemaSyncService(),
new FakeCatalogService(FakeInternalTable.PARTITIONS),
- placementDriver,
+ ignite.placementDriver(),
clientConnectorConfiguration,
new TestLowWatermark()
);
@@ -317,7 +315,7 @@ public class TestServer implements AutoCloseable {
* @return Placement driver.
*/
public FakePlacementDriver placementDriver() {
- return placementDriver;
+ return ignite.placementDriver();
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
b/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
index 64656cf17b..677d5b172a 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.benchmarks;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
-import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.TestServer;
import org.apache.ignite.client.fakes.FakeIgnite;
@@ -67,7 +66,7 @@ public class ClientPutGetBenchmark {
private TestServer testServer;
- private Ignite ignite;
+ private FakeIgnite ignite;
private IgniteClient client;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 1d6678d73c..37d1ea2924 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.catalog.IgniteCatalog;
+import org.apache.ignite.client.handler.FakePlacementDriver;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -50,6 +51,8 @@ public class FakeIgnite implements Ignite {
private final IgniteTables tables;
+ private final FakePlacementDriver placementDriver = new
FakePlacementDriver(FakeInternalTable.PARTITIONS);
+
/**
* Default constructor.
*/
@@ -65,7 +68,7 @@ public class FakeIgnite implements Ignite {
public FakeIgnite(String name) {
this.name = name;
this.compute = new FakeCompute(name, this);
- this.tables = new FakeIgniteTables(compute);
+ this.tables = new FakeIgniteTables(compute, placementDriver);
}
/** {@inheritDoc} */
@@ -122,4 +125,8 @@ public class FakeIgnite implements Ignite {
public HybridTimestampTracker timestampTracker() {
return hybridTimestampTracker;
}
+
+ public FakePlacementDriver placementDriver() {
+ return placementDriver;
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 962bdf60ea..20f4d38c2a 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import org.apache.ignite.client.handler.FakePlacementDriver;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -73,8 +74,11 @@ public class FakeIgniteTables implements
IgniteTablesInternal {
private final IgniteCompute compute;
- FakeIgniteTables(IgniteCompute compute) {
+ private final FakePlacementDriver placementDriver;
+
+ FakeIgniteTables(IgniteCompute compute, FakePlacementDriver
placementDriver) {
this.compute = compute;
+ this.placementDriver = placementDriver;
}
/**
@@ -220,7 +224,7 @@ public class FakeIgniteTables implements
IgniteTablesInternal {
};
return new TableImpl(
- new FakeInternalTable(name, id, keyExtractor, compute),
+ new FakeInternalTable(name, id, keyExtractor, compute,
placementDriver),
schemaReg,
new HeapLockManager(),
new SchemaVersions() {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2789d783dc..eb3bc3c6ee 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import javax.naming.OperationNotSupportedException;
+import org.apache.ignite.client.handler.FakePlacementDriver;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
@@ -43,6 +44,7 @@ import
org.apache.ignite.internal.compute.streamer.StreamerReceiverJob;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -80,6 +82,8 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
private final IgniteCompute compute;
+ private final FakePlacementDriver placementDriver;
+
/** Data access listener. */
private BiConsumer<String, Object> dataAccessListener;
@@ -89,12 +93,19 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
* @param tableName Name.
* @param tableId Id.
* @param keyExtractor Function which converts given binary row to an
index key.
+ * @param placementDriver Placement driver.
*/
- FakeInternalTable(String tableName, int tableId, ColumnsExtractor
keyExtractor, IgniteCompute compute) {
+ FakeInternalTable(
+ String tableName,
+ int tableId,
+ ColumnsExtractor keyExtractor,
+ IgniteCompute compute,
+ FakePlacementDriver placementDriver) {
this.tableName = tableName;
this.tableId = tableId;
this.keyExtractor = keyExtractor;
this.compute = compute;
+ this.placementDriver = placementDriver;
}
@Override
@@ -506,8 +517,15 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
@Override
public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId
partitionId) {
+ List<ReplicaMeta> replicaMetas = placementDriver.primaryReplicas();
+ ReplicaMeta replica = replicaMetas.get(partitionId.partitionId());
+
+ //noinspection DataFlowIssue
return completedFuture(
- new ClusterNodeImpl("server-1", "server-1", new
NetworkAddress("localhost", 10800)));
+ new ClusterNodeImpl(
+ replica.getLeaseholderId(),
+ replica.getLeaseholder(),
+ new NetworkAddress("localhost", 10800)));
}
@Override