This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c97d7c5db3 IGNITE-21247 Log enhancements for LeaseUpdater (#3109)
c97d7c5db3 is described below
commit c97d7c5db327583b82eeca1165cb4eea21a3b3a2
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Feb 7 12:13:06 2024 +0300
IGNITE-21247 Log enhancements for LeaseUpdater (#3109)
---
.../internal/tostring/IgniteToStringBuilder.java | 52 +++++++++++-
.../internal/placementdriver/LeaseUpdater.java | 99 +++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 19 ++++-
3 files changed, 163 insertions(+), 7 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
index 86a0e052db..6d8c88d6b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
@@ -54,6 +54,7 @@ import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.lang.IgniteTriConsumer;
import org.jetbrains.annotations.Nullable;
/**
@@ -1450,6 +1451,53 @@ public class IgniteToStringBuilder {
}
}
+ /**
+ * Produces a string representation of a list with customized string
representation of elements.
+ *
+ * @param list List.
+ * @param elementToString Element to string transformer, accepts the
string builder, element of list and its index.
+ * @param <T> Type of list elements.
+ * @return String.
+ */
+ public static <T> String toString(List<T> list,
IgniteTriConsumer<IgniteStringBuilder, T, Integer> elementToString) {
+ int listSize = list.size();
+
+ IgniteStringBuilder buf = new IgniteStringBuilder();
+
+ buf.app(" [");
+
+ int cnt = 0;
+ boolean needHandleOverflow = true;
+
+ try {
+ for (int i = 0; i < list.size(); i++) {
+ if (i > 0) {
+ buf.app(',');
+ }
+
+ T el = list.get(i);
+
+ elementToString.accept(buf, el, i);
+
+ if (++cnt == COLLECTION_LIMIT || cnt == listSize) {
+ break;
+ }
+ }
+ } catch (ConcurrentModificationException e) {
+ handleConcurrentModification(buf, cnt, listSize);
+
+ needHandleOverflow = false;
+ }
+
+ if (needHandleOverflow) {
+ handleOverflow(buf, listSize);
+ }
+
+ buf.app(']');
+
+ return buf.toString();
+ }
+
/**
* Writes array to buffer.
*
@@ -1583,7 +1631,7 @@ public class IgniteToStringBuilder {
* @param buf String builder buffer.
* @param size Size to compare with limit.
*/
- private static void handleOverflow(StringBuilderLimitedLength buf, int
size) {
+ private static void handleOverflow(IgniteStringBuilder buf, int size) {
int overflow = size - COLLECTION_LIMIT;
if (overflow > 0) {
@@ -1598,7 +1646,7 @@ public class IgniteToStringBuilder {
* @param writtenElements Number of elements successfully written to
output.
* @param size Overall size of collection.
*/
- private static void
handleConcurrentModification(StringBuilderLimitedLength buf, int
writtenElements, int size) {
+ private static void handleConcurrentModification(IgniteStringBuilder buf,
int writtenElements, int size) {
buf.app("... concurrent modification was detected,
").app(writtenElements).app(" out of ").app(size)
.app(" were written");
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index bf471c266c..6d0762659b 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -56,6 +56,8 @@ import
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement;
import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -64,6 +66,10 @@ import org.jetbrains.annotations.Nullable;
* A processor to manger leases. The process is started when placement driver
activates and stopped when it deactivates.
*/
public class LeaseUpdater {
+ /** Negative value means that printing statistics is disabled. */
+ private static final int LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS
= IgniteSystemProperties
+ .getInteger("LEASE_STATISTICS_PRINT_ONCE_PER_ITERATIONS", 10);
+
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(LeaseUpdater.class);
@@ -275,6 +281,11 @@ public class LeaseUpdater {
/** Runnable to update lease in Meta storage. */
private class Updater implements Runnable {
+ private LeaseStats leaseUpdateStatistics = new LeaseStats();
+
+ /** This field should be accessed only from updater thread. */
+ private int statisticsLogCounter;
+
@Override
public void run() {
while (active() && !Thread.interrupted()) {
@@ -286,6 +297,13 @@ public class LeaseUpdater {
if (active()) {
updateLeaseBatchInternal();
}
+ } catch (Throwable e) {
+ LOG.error("Error occurred when updating the leases.", e);
+
+ if (e instanceof Error) {
+ // TODO IGNITE-20368 The node should be halted in case
of an error here.
+ throw (Error) e;
+ }
} finally {
stateChangingLock.leaveBusy();
}
@@ -302,6 +320,8 @@ public class LeaseUpdater {
private void updateLeaseBatchInternal() {
HybridTimestamp now = clock.now();
+ leaseUpdateStatistics = new LeaseStats();
+
long outdatedLeaseThreshold = now.getPhysical() + LEASE_INTERVAL /
2;
Leases leasesCurrent = leaseTracker.leasesCurrent();
@@ -315,11 +335,18 @@ public class LeaseUpdater {
renewedLeases.entrySet().removeIf(e ->
e.getValue().getExpirationTime().before(now)
&&
!currentAssignmentsReplicationGroupIds.contains(e.getKey()));
+ int currentAssignmentsSize = currentAssignments.size();
+ int activeLeasesCount = 0;
+
for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry :
currentAssignments.entrySet()) {
ReplicationGroupId grpId = entry.getKey();
Lease lease = leaseTracker.getLease(grpId);
+ if (lease.isAccepted() && !isLeaseOutdated(lease)) {
+ activeLeasesCount++;
+ }
+
if (!lease.isAccepted()) {
LeaseAgreement agreement =
leaseNegotiator.negotiated(grpId);
@@ -332,6 +359,8 @@ public class LeaseUpdater {
ClusterNode candidate =
nextLeaseHolder(entry.getValue(), agreement.getRedirectTo());
if (candidate == null) {
+ leaseUpdateStatistics.onLeaseWithoutCandidate();
+
continue;
}
@@ -350,6 +379,8 @@ public class LeaseUpdater {
);
if (candidate == null) {
+ leaseUpdateStatistics.onLeaseWithoutCandidate();
+
continue;
}
@@ -368,7 +399,18 @@ public class LeaseUpdater {
byte[] renewedValue = new
LeaseBatch(renewedLeases.values()).bytes();
- var key = PLACEMENTDRIVER_LEASES_KEY;
+ ByteArray key = PLACEMENTDRIVER_LEASES_KEY;
+
+ if (shouldLogLeaseStatistics()) {
+ LOG.info(
+ "Leases updated (printed once per {} iteration(s)):
[inCurrentIteration={}, active={}, "
+ + "currentAssignmentsSize={}].",
+ LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS,
+ leaseUpdateStatistics,
+ activeLeasesCount,
+ currentAssignmentsSize
+ );
+ }
msManager.invoke(
or(notExists(key),
value(key).eq(leasesCurrent.leasesBytes())),
@@ -416,6 +458,8 @@ public class LeaseUpdater {
renewedLeases.put(grpId, renewedLease);
toBeNegotiated.put(grpId, !lease.isAccepted() &&
Objects.equals(lease.getLeaseholder(), candidate.name()));
+
+ leaseUpdateStatistics.onLeaseCreate();
}
/**
@@ -430,6 +474,8 @@ public class LeaseUpdater {
Lease renewedLease = lease.prolongLease(newTs);
renewedLeases.put(grpId, renewedLease);
+
+ leaseUpdateStatistics.onLeaseProlong();
}
/**
@@ -445,6 +491,8 @@ public class LeaseUpdater {
Lease renewedLease = lease.acceptLease(newTs);
renewedLeases.put(grpId, renewedLease);
+
+ leaseUpdateStatistics.onLeasePublish();
}
/**
@@ -459,6 +507,55 @@ public class LeaseUpdater {
return now.after(lease.getExpirationTime());
}
+
+ private boolean shouldLogLeaseStatistics() {
+ if (LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS < 0) {
+ return false;
+ }
+
+ boolean result = ++statisticsLogCounter >
LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS;
+
+ if (result) {
+ statisticsLogCounter = 0;
+ }
+
+ return result;
+ }
+ }
+
+ private static class LeaseStats {
+ @IgniteToStringInclude
+ int leasesCreated;
+
+ @IgniteToStringInclude
+ int leasesPublished;
+
+ @IgniteToStringInclude
+ int leasesProlonged;
+
+ @IgniteToStringInclude
+ int leasesWithoutCandidates;
+
+ private void onLeaseCreate() {
+ leasesCreated++;
+ }
+
+ private void onLeasePublish() {
+ leasesPublished++;
+ }
+
+ private void onLeaseProlong() {
+ leasesProlonged++;
+ }
+
+ private void onLeaseWithoutCandidate() {
+ leasesWithoutCandidates++;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}
/** Message handler to process notification from replica side. */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 46fd7f31ab..f3c58b4d49 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -170,6 +170,7 @@ import
org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
import
org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -652,8 +653,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
.invoke(condition, partitionAssignments,
Collections.emptyList())
.thenCompose(invokeResult -> {
if (invokeResult) {
- LOG.info(IgniteStringFormatter.format("Assignments
calculated from data nodes are successfully "
- + "written to meta storage
[tableId={}, assignments={}]", tableId, newAssignments));
+ LOG.info(IgniteStringFormatter.format("Assignments
calculated from data nodes are successfully written"
+ + " to meta storage [tableId={},
assignments={}]", tableId, assignmentListToString(newAssignments)));
return completedFuture(newAssignments);
} else {
@@ -679,7 +680,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage
[tableId={}, "
- + "assignments={}]", tableId,
realAssignments));
+ + "assignments={}]", tableId,
assignmentListToString(realAssignments)));
return realAssignments;
});
@@ -1167,7 +1168,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
assignmentsFuture.thenAccept(assignmentsList -> {
LOG.info(IgniteStringFormatter.format("Assignments
calculated from data nodes [table={}, tableId={}, assignments={}, "
- + "revision={}]", tableDescriptor.name(),
tableId, assignmentsList, causalityToken));
+ + "revision={}]", tableDescriptor.name(), tableId,
assignmentListToString(assignmentsList), causalityToken));
});
}
@@ -1303,6 +1304,16 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return createPartsFut.thenApply(ignore -> null);
}
+ /**
+ * Creates a string representation of the given assignments list to use it
for logging.
+ *
+ * @param assignments List of assignments.
+ * @return String representation of the given assignments list to use it
for logging.
+ */
+ private static String assignmentListToString(List<Set<Assignment>>
assignments) {
+ return S.toString(assignments, (sb, e, i) ->
sb.app(i).app('=').app(e));
+ }
+
/**
* Creates data storage for the provided table.
*