This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new a1de795 [NO ISSUE][REP] Add replica sync progress a1de795 is described below commit a1de795e82273b128b706794988fb7dd09a0267d Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Thu Sep 2 13:35:49 2021 +0300 [NO ISSUE][REP] Add replica sync progress - user model changes: no - storage format changes: no - interface changes: yes Details: - Add replica sync progress based on the replica missing files. - Add replica last progress timestamp that can be used to determine replica progress inactivity. Change-Id: Iab2cd7e745c4150e2d0aef3af864ec0f66dd96e7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13063 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../common/replication/IPartitionReplica.java | 14 +++++++++ .../asterix/replication/api/PartitionReplica.java | 36 ++++++++++++++++++++++ .../replication/sync/ReplicaFilesSynchronizer.java | 7 ++++- 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java index 761b2c6..f311655 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java @@ -51,4 +51,18 @@ public interface IPartitionReplica { * @param failure */ void notifyFailure(Exception failure); + + /** + * Gets the current sync progress + * + * @return the current sync progress + */ + double getSyncProgress(); + + /** + * Gets the last progress time of this replica based on System.nanoTime + * + * @return the last progress time + */ + long getLastProgressTime(); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index 282d475..e265d03 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -52,6 +52,8 @@ public class PartitionReplica implements IPartitionReplica { private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); private final INcApplicationContext appCtx; private final ReplicaIdentifier id; + private double syncProgress = -1; + private long lastProgressTime = -1; private ByteBuffer reusbaleBuf; private PartitionReplicaStatus status = DISCONNECTED; private ISocketChannel sc; @@ -133,6 +135,16 @@ public class PartitionReplica implements IPartitionReplica { return reusbaleBuf; } + public synchronized void setSyncProgress(double syncProgress) { + this.syncProgress = syncProgress; + lastProgressTime = System.nanoTime(); + } + + @Override + public synchronized double getSyncProgress() { + return syncProgress; + } + private JsonNode asJson() { ObjectNode json = OBJECT_MAPPER.createObjectNode(); json.put("id", id.toString()); @@ -153,6 +165,19 @@ public class PartitionReplica implements IPartitionReplica { } @Override + public synchronized long getLastProgressTime() { + switch (status) { + case IN_SYNC: + return System.nanoTime(); + case CATCHING_UP: + return lastProgressTime; + case DISCONNECTED: + return -1; + } + return -1; + } + + @Override public int hashCode() { return id.hashCode(); } @@ -172,6 +197,17 @@ public class PartitionReplica implements IPartitionReplica { } LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status); this.status = status; + switch (status) { + case IN_SYNC: + syncProgress = 1; + break; + case CATCHING_UP: + lastProgressTime = System.nanoTime(); + break; + case DISCONNECTED: + syncProgress = -1; + break; + } } private void sendGoodBye() { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 3c93d17..faf3f54 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -105,7 +105,12 @@ public class ReplicaFilesSynchronizer { final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); // sort files to ensure index metadata files starting with "." are replicated first files.sort(String::compareTo); - files.forEach(sync::replicate); + int missingFilesCount = files.size(); + for (int i = 0; i < missingFilesCount; i++) { + String file = files.get(i); + sync.replicate(file); + replica.setSyncProgress((i + 1d) / missingFilesCount); + } } private void deleteInvalidFiles(List<String> files) {