This is an automated email from the ASF dual-hosted git repository. anovikov pushed a commit to branch ignite-18302 in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit d0f27f9a16c2605b3eeefedbd2896463adf941f4 Author: Andrey Novikov <[email protected]> AuthorDate: Fri Sep 1 11:38:41 2023 +0700 IGNITE-18302 Add support for different save modes. --- .../sessions/IgniteIndexedSessionRepository.java | 75 +++++++++++++++++++--- .../ignite/spring/sessions/IgniteSession.java | 71 +++++++++++++------- .../spring/sessions/proxy/ClientSessionProxy.java | 5 ++ .../spring/sessions/proxy/IgniteSessionProxy.java | 5 ++ .../ignite/spring/sessions/proxy/SessionProxy.java | 9 +++ 5 files changed, 133 insertions(+), 32 deletions(-) diff --git a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java index 4d83f0b5..25a3e315 100644 --- a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java +++ b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java @@ -93,6 +93,13 @@ public class IgniteIndexedSessionRepository */ public static final String DEFAULT_SESSION_MAP_NAME = "spring:session:sessions"; + /** + * Maximum of attempts for atomicity replace. If something wrong with IgniteSession, old value can never be equal to + * value from repository. In this case replace will never end the loop. If this value is exceeded, then plain + * {@link SessionProxy#replace(String, IgniteSession)} will be used. + */ + private static final int MAX_UPDATE_ATTEMPT = 100; + /** */ private static final Log logger = LogFactory.getLog(IgniteIndexedSessionRepository.class); @@ -109,7 +116,7 @@ public class IgniteIndexedSessionRepository private FlushMode flushMode = FlushMode.ON_SAVE; /** The save mode. */ - private SaveMode saveMode = SaveMode.ON_SET_ATTRIBUTE; + private SaveMode saveMode = SaveMode.ALWAYS; /** The index resolver. */ private IndexResolver<Session> idxResolver = new DelegatingIndexResolver<>(new PrincipalNameIndexResolver<>()); @@ -173,8 +180,8 @@ public class IgniteIndexedSessionRepository /** * Set the maximum inactive interval in seconds between requests before newly created - * sessions will be invalidated. A negative time indicates that the session will never - * timeout. The default is 1800 (30 minutes). + * sessions will be invalidated. A negative time indicates that the session will never timeout. + * The default is 1800 (30 minutes). * @param dfltMaxInactiveInterval the maximum inactive interval in seconds */ public void setDefaultMaxInactiveInterval(Integer dfltMaxInactiveInterval) { @@ -223,14 +230,22 @@ public class IgniteIndexedSessionRepository @Override public void save(IgniteSession ses) { if (ses.isNew()) ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), ses); - else if (ses.hasChangedSessionId()) { - String originalId = ses.getDelegate().getOriginalId(); + else { + String originalId = ses.getOriginalId(); - sessions.remove(originalId); - ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), ses); + if (!ses.getId().equals(originalId)) { + sessions.remove(originalId); + + ses.resetOriginalId(); + ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), ses); + } + else if (ses.hasChanges()) { + if (saveMode == SaveMode.ALWAYS) + ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), ses); + else + updatePartial(ses); + } } - else if (ses.hasChanges()) - ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), ses); ses.clearChangeFlags(); } @@ -344,4 +359,46 @@ public class IgniteIndexedSessionRepository if (flushMode == FlushMode.IMMEDIATE) save(ses); } + + /** + * @param targetSes Target session. + * @param activeSes Active session. + */ + private void copyChanges(IgniteSession targetSes, IgniteSession activeSes) { + if (activeSes.isLastAccessedTimeChanged()) + targetSes.setLastAccessedTime(activeSes.getLastAccessedTime()); + + Map<String, Object> changes = activeSes.getAttributesChanges(); + + if (!changes.isEmpty()) + changes.forEach(targetSes::setAttribute); + } + + /** + * @param ses Session. + */ + private void updatePartial(IgniteSession ses) { + IgniteSession oldSes, updatedSes; + int attempt = 0; + + do { + attempt++; + + oldSes = sessions.get(ses.getId()); + + if (oldSes == null) + break; + + updatedSes = new IgniteSession(oldSes.getDelegate(), idxResolver, false, saveMode, this::flushImmediateIfNecessary); + copyChanges(updatedSes, ses); + + if (attempt > MAX_UPDATE_ATTEMPT) { + logger.warn("Session maximum update attempts has been reached," + + " 'replace' will be used instead [id=" + updatedSes.getId() + "]"); + + ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), updatedSes); + break; + } + } while (ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), oldSes, updatedSes)); + } } diff --git a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java index 1d0ce5bc..7663086a 100644 --- a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java +++ b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java @@ -2,6 +2,7 @@ package org.apache.ignite.spring.sessions; import java.time.Duration; import java.time.Instant; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -13,6 +14,7 @@ import org.springframework.session.SaveMode; import org.springframework.session.Session; import static org.springframework.session.FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME; +import static org.springframework.session.SaveMode.ON_GET_ATTRIBUTE; /** * A custom implementation of {@link Session} that uses a {@link MapSession} as the basis for its mapping. It keeps @@ -43,7 +45,7 @@ public class IgniteSession implements Session { /** */ @GridDirectTransient - private transient boolean attrsChanged; + private final transient Map<String, Object> delta = new HashMap<>(); /** The index resolver. */ @GridDirectTransient @@ -58,33 +60,33 @@ public class IgniteSession implements Session { private final transient Consumer<IgniteSession> flusher; /** - * @param cached The map session. + * @param delegate The map session. * @param idxResolver The index resolver. * @param isNew Is new flag. * @param saveMode Mode of tracking and saving session changes to session store. * @param flusher Flusher for session store. */ IgniteSession( - MapSession cached, + MapSession delegate, IndexResolver<Session> idxResolver, boolean isNew, SaveMode saveMode, Consumer<IgniteSession> flusher ) { - this.delegate = cached; - this.idxResolver = idxResolver; + this.delegate = delegate; this.isNew = isNew; + + this.idxResolver = idxResolver; this.saveMode = saveMode; this.flusher = flusher; - principal = delegate.getAttribute(PRINCIPAL_NAME_INDEX_NAME); + principal = this.delegate.getAttribute(PRINCIPAL_NAME_INDEX_NAME); - if (isNew) { - if (saveMode == SaveMode.ALWAYS) - attrsChanged = true; + if (this.isNew || this.saveMode == SaveMode.ALWAYS) + getAttributeNames().forEach(attrName -> delta.put(attrName, this.delegate.getAttribute(attrName))); - flusher.accept(this); - } + if (isNew) + this.flusher.accept(this); } /** {@inheritDoc} */ @@ -137,8 +139,8 @@ public class IgniteSession implements Session { @Override public <T> T getAttribute(String attrName) { T attrVal = this.delegate.getAttribute(attrName); - if (attrVal != null && saveMode.equals(SaveMode.ON_GET_ATTRIBUTE)) - attrsChanged = true; + if (attrVal != null && saveMode.equals(ON_GET_ATTRIBUTE)) + delta.put(attrName, attrVal); return attrVal; } @@ -151,14 +153,16 @@ public class IgniteSession implements Session { /** {@inheritDoc} */ @Override public void setAttribute(String attrName, Object attrVal) { delegate.setAttribute(attrName, attrVal); - attrsChanged = true; - + delta.put(attrName, attrVal); + if (SPRING_SECURITY_CONTEXT.equals(attrName)) { Map<String, String> indexes = idxResolver.resolveIndexesFor(this); String principal = (attrVal != null) ? indexes.get(PRINCIPAL_NAME_INDEX_NAME) : null; this.principal = principal; + delegate.setAttribute(PRINCIPAL_NAME_INDEX_NAME, principal); + delta.put(PRINCIPAL_NAME_INDEX_NAME, principal); } flusher.accept(this); @@ -186,15 +190,25 @@ public class IgniteSession implements Session { /** * @return {@code True} if session is changed. */ - public boolean hasChanges() { - return lastAccessedTimeChanged || maxInactiveIntervalChanged || attrsChanged; + public Map<String, Object> getAttributesChanges() { + return new HashMap<>(delta); } /** - * @return {@code True} if session ID is changed. + * Get the original session id. + * @return the original session id. + * @see #changeSessionId() */ - public boolean hasChangedSessionId() { - return !delegate.getId().equals(delegate.getOriginalId()); + public String getOriginalId() { + return delegate.getOriginalId(); + } + + /** + * Reset the original session id. + * @see #changeSessionId() + */ + public void resetOriginalId() { + delegate = new MapSession(delegate); } /** Reset the change flags. */ @@ -202,10 +216,21 @@ public class IgniteSession implements Session { isNew = false; lastAccessedTimeChanged = false; maxInactiveIntervalChanged = false; - attrsChanged = false; + delta.clear(); + } - if (hasChangedSessionId()) - delegate = new MapSession(delegate); + /** + * @return Last accessed time changed. + */ + public boolean isLastAccessedTimeChanged() { + return lastAccessedTimeChanged; + } + + /** + * @return Session changed. + */ + public boolean hasChanges() { + return lastAccessedTimeChanged || maxInactiveIntervalChanged || !delta.isEmpty(); } /** {@inheritDoc} */ diff --git a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java index f8f6a6ce..bcde5858 100644 --- a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java +++ b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java @@ -77,6 +77,11 @@ public class ClientSessionProxy implements SessionProxy { return cache.replace(key, val); } + /** {@inheritDoc} */ + @Override public boolean replace(String key, IgniteSession oldVal, IgniteSession newVal) { + return cache.replace(key, oldVal, newVal); + } + /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(Query<R> qry) { return cache.query(qry); diff --git a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java index 8734192a..6f41918e 100644 --- a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java +++ b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java @@ -77,6 +77,11 @@ public class IgniteSessionProxy implements SessionProxy { return cache.replace(key, val); } + /** {@inheritDoc} */ + @Override public boolean replace(String key, IgniteSession oldVal, IgniteSession newVal) { + return cache.replace(key, oldVal, newVal); + } + /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(Query<R> qry) { return cache.query(qry); diff --git a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java index 12b44466..f31d4433 100644 --- a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java +++ b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java @@ -23,6 +23,7 @@ import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.event.CacheEntryListener; import javax.cache.expiry.ExpiryPolicy; import javax.cache.integration.CacheWriter; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.spring.sessions.IgniteSession; @@ -108,6 +109,14 @@ public interface SessionProxy { */ public boolean replace(String key, IgniteSession val); + /** + * {@inheritDoc} + * <p> + * For {@link CacheAtomicityMode#ATOMIC} return + * value on primary node crash may be incorrect because of the automatic retries. + */ + public boolean replace(String key, IgniteSession oldVal, IgniteSession newVal); + /** * Execute SQL query and get cursor to iterate over results. *
