This is an automated email from the ASF dual-hosted git repository.

anovikov pushed a commit to branch ignite-18302
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git

commit d0f27f9a16c2605b3eeefedbd2896463adf941f4
Author: Andrey Novikov <[email protected]>
AuthorDate: Fri Sep 1 11:38:41 2023 +0700

    IGNITE-18302 Add support for different save modes.
---
 .../sessions/IgniteIndexedSessionRepository.java   | 75 +++++++++++++++++++---
 .../ignite/spring/sessions/IgniteSession.java      | 71 +++++++++++++-------
 .../spring/sessions/proxy/ClientSessionProxy.java  |  5 ++
 .../spring/sessions/proxy/IgniteSessionProxy.java  |  5 ++
 .../ignite/spring/sessions/proxy/SessionProxy.java |  9 +++
 5 files changed, 133 insertions(+), 32 deletions(-)

diff --git 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java
 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java
index 4d83f0b5..25a3e315 100644
--- 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java
+++ 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteIndexedSessionRepository.java
@@ -93,6 +93,13 @@ public class IgniteIndexedSessionRepository
      */
     public static final String DEFAULT_SESSION_MAP_NAME = 
"spring:session:sessions";
 
+    /**
+     * Maximum of attempts for atomicity replace. If something wrong with 
IgniteSession, old value can never be equal to
+     * value from repository. In this case replace will never end the loop. If 
this value is exceeded, then plain
+     * {@link SessionProxy#replace(String, IgniteSession)} will be used.
+     */
+    private static final int MAX_UPDATE_ATTEMPT = 100;
+
     /** */
     private static final Log logger = 
LogFactory.getLog(IgniteIndexedSessionRepository.class);
 
@@ -109,7 +116,7 @@ public class IgniteIndexedSessionRepository
     private FlushMode flushMode = FlushMode.ON_SAVE;
 
     /** The save mode. */
-    private SaveMode saveMode = SaveMode.ON_SET_ATTRIBUTE;
+    private SaveMode saveMode = SaveMode.ALWAYS;
 
     /** The index resolver. */
     private IndexResolver<Session> idxResolver = new 
DelegatingIndexResolver<>(new PrincipalNameIndexResolver<>());
@@ -173,8 +180,8 @@ public class IgniteIndexedSessionRepository
 
     /**
      * Set the maximum inactive interval in seconds between requests before 
newly created
-     * sessions will be invalidated. A negative time indicates that the 
session will never
-     * timeout. The default is 1800 (30 minutes).
+     * sessions will be invalidated. A negative time indicates that the 
session will never timeout.
+     * The default is 1800 (30 minutes).
      * @param dfltMaxInactiveInterval the maximum inactive interval in seconds
      */
     public void setDefaultMaxInactiveInterval(Integer dfltMaxInactiveInterval) 
{
@@ -223,14 +230,22 @@ public class IgniteIndexedSessionRepository
     @Override public void save(IgniteSession ses) {
         if (ses.isNew())
             ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), ses);
-        else if (ses.hasChangedSessionId()) {
-            String originalId = ses.getDelegate().getOriginalId();
+        else {
+            String originalId = ses.getOriginalId();
 
-            sessions.remove(originalId);
-            ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), ses);
+            if (!ses.getId().equals(originalId)) {
+                sessions.remove(originalId);
+
+                ses.resetOriginalId();
+                ttlSessions(ses.getMaxInactiveInterval()).put(ses.getId(), 
ses);
+            }
+            else if (ses.hasChanges()) {
+                if (saveMode == SaveMode.ALWAYS)
+                    
ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), ses);
+                else
+                    updatePartial(ses);
+            }
         }
-        else if (ses.hasChanges())
-            ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), 
ses);
         
         ses.clearChangeFlags();
     }
@@ -344,4 +359,46 @@ public class IgniteIndexedSessionRepository
         if (flushMode == FlushMode.IMMEDIATE)
             save(ses);
     }
+
+    /**
+     * @param targetSes Target session.
+     * @param activeSes Active session.
+     */
+    private void copyChanges(IgniteSession targetSes, IgniteSession activeSes) 
{
+        if (activeSes.isLastAccessedTimeChanged())
+            targetSes.setLastAccessedTime(activeSes.getLastAccessedTime());
+
+        Map<String, Object> changes = activeSes.getAttributesChanges();
+
+        if (!changes.isEmpty())
+            changes.forEach(targetSes::setAttribute);
+    }
+
+    /**
+     * @param ses Session.
+     */
+    private void updatePartial(IgniteSession ses) {
+        IgniteSession oldSes, updatedSes;
+        int attempt = 0;
+
+        do {
+            attempt++;
+
+            oldSes = sessions.get(ses.getId());
+
+            if (oldSes == null)
+                break;
+
+            updatedSes = new IgniteSession(oldSes.getDelegate(), idxResolver, 
false, saveMode, this::flushImmediateIfNecessary);
+            copyChanges(updatedSes, ses);
+
+            if (attempt > MAX_UPDATE_ATTEMPT) {
+                logger.warn("Session maximum update attempts has been 
reached," +
+                    " 'replace' will be used instead [id=" + 
updatedSes.getId() + "]");
+
+                ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), 
updatedSes);
+                break;
+            }
+        } while 
(ttlSessions(ses.getMaxInactiveInterval()).replace(ses.getId(), oldSes, 
updatedSes));
+    }
 }
diff --git 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java
 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java
index 1d0ce5bc..7663086a 100644
--- 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java
+++ 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/IgniteSession.java
@@ -2,6 +2,7 @@ package org.apache.ignite.spring.sessions;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -13,6 +14,7 @@ import org.springframework.session.SaveMode;
 import org.springframework.session.Session;
 
 import static 
org.springframework.session.FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME;
+import static org.springframework.session.SaveMode.ON_GET_ATTRIBUTE;
 
 /**
  * A custom implementation of {@link Session} that uses a {@link MapSession} 
as the basis for its mapping. It keeps
@@ -43,7 +45,7 @@ public class IgniteSession implements Session {
 
     /** */
     @GridDirectTransient
-    private transient boolean attrsChanged;
+    private final transient Map<String, Object> delta = new HashMap<>();
 
     /** The index resolver. */
     @GridDirectTransient
@@ -58,33 +60,33 @@ public class IgniteSession implements Session {
     private final transient Consumer<IgniteSession> flusher;
 
     /**
-     * @param cached The map session.
+     * @param delegate The map session.
      * @param idxResolver The index resolver.
      * @param isNew Is new flag.
      * @param saveMode Mode of tracking and saving session changes to session 
store.
      * @param flusher Flusher for session store.
      */
     IgniteSession(
-        MapSession cached,
+        MapSession delegate,
         IndexResolver<Session> idxResolver,
         boolean isNew,
         SaveMode saveMode,
         Consumer<IgniteSession> flusher
     ) {
-        this.delegate = cached;
-        this.idxResolver = idxResolver;
+        this.delegate = delegate;
         this.isNew = isNew;
+
+        this.idxResolver = idxResolver;
         this.saveMode = saveMode;
         this.flusher = flusher;
 
-        principal = delegate.getAttribute(PRINCIPAL_NAME_INDEX_NAME);
+        principal = this.delegate.getAttribute(PRINCIPAL_NAME_INDEX_NAME);
 
-        if (isNew) {
-            if (saveMode == SaveMode.ALWAYS)
-                attrsChanged = true;
+        if (this.isNew || this.saveMode == SaveMode.ALWAYS)
+            getAttributeNames().forEach(attrName -> delta.put(attrName, 
this.delegate.getAttribute(attrName)));
 
-            flusher.accept(this);
-        }
+        if (isNew)
+            this.flusher.accept(this);
     }
 
     /** {@inheritDoc} */
@@ -137,8 +139,8 @@ public class IgniteSession implements Session {
     @Override public <T> T getAttribute(String attrName) {
         T attrVal = this.delegate.getAttribute(attrName);
 
-        if (attrVal != null && saveMode.equals(SaveMode.ON_GET_ATTRIBUTE))
-            attrsChanged = true;
+        if (attrVal != null && saveMode.equals(ON_GET_ATTRIBUTE))
+            delta.put(attrName, attrVal);
 
         return attrVal;
     }
@@ -151,14 +153,16 @@ public class IgniteSession implements Session {
     /** {@inheritDoc} */
     @Override public void setAttribute(String attrName, Object attrVal) {
         delegate.setAttribute(attrName, attrVal);
-        attrsChanged = true;
-        
+        delta.put(attrName, attrVal);
+
         if (SPRING_SECURITY_CONTEXT.equals(attrName)) {
             Map<String, String> indexes = idxResolver.resolveIndexesFor(this);
             String principal = (attrVal != null) ? 
indexes.get(PRINCIPAL_NAME_INDEX_NAME) : null;
 
             this.principal = principal;
+            
             delegate.setAttribute(PRINCIPAL_NAME_INDEX_NAME, principal);
+            delta.put(PRINCIPAL_NAME_INDEX_NAME, principal);
         }
 
         flusher.accept(this);
@@ -186,15 +190,25 @@ public class IgniteSession implements Session {
     /**
      * @return {@code True} if session is changed.
      */
-    public boolean hasChanges() {
-        return lastAccessedTimeChanged || maxInactiveIntervalChanged || 
attrsChanged;
+    public Map<String, Object> getAttributesChanges() {
+        return new HashMap<>(delta);
     }
 
     /**
-     * @return {@code True} if session ID is changed.
+     * Get the original session id.
+     * @return the original session id.
+     * @see #changeSessionId()
      */
-    public boolean hasChangedSessionId() {
-        return !delegate.getId().equals(delegate.getOriginalId());
+    public String getOriginalId() {
+        return delegate.getOriginalId();
+    }
+
+    /**
+     * Reset the original session id.
+     * @see #changeSessionId()
+     */
+    public void resetOriginalId() {
+        delegate = new MapSession(delegate);
     }
 
     /** Reset the change flags. */
@@ -202,10 +216,21 @@ public class IgniteSession implements Session {
         isNew = false;
         lastAccessedTimeChanged = false;
         maxInactiveIntervalChanged = false;
-        attrsChanged = false;
+        delta.clear();
+    }
 
-        if (hasChangedSessionId())
-            delegate = new MapSession(delegate);
+    /**
+     * @return Last accessed time changed.
+     */
+    public boolean isLastAccessedTimeChanged() {
+        return lastAccessedTimeChanged;
+    }
+
+    /**
+     * @return Session changed.
+     */
+    public boolean hasChanges() {
+        return lastAccessedTimeChanged || maxInactiveIntervalChanged || 
!delta.isEmpty();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java
 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java
index f8f6a6ce..bcde5858 100644
--- 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java
+++ 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/ClientSessionProxy.java
@@ -77,6 +77,11 @@ public class ClientSessionProxy implements SessionProxy {
         return cache.replace(key, val);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean replace(String key, IgniteSession oldVal, 
IgniteSession newVal) {
+        return cache.replace(key, oldVal, newVal);
+    }
+
     /** {@inheritDoc} */
     @Override public <R> QueryCursor<R> query(Query<R> qry) {
         return cache.query(qry);
diff --git 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java
 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java
index 8734192a..6f41918e 100644
--- 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java
+++ 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/IgniteSessionProxy.java
@@ -77,6 +77,11 @@ public class IgniteSessionProxy implements SessionProxy {
         return cache.replace(key, val);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean replace(String key, IgniteSession oldVal, 
IgniteSession newVal) {
+        return cache.replace(key, oldVal, newVal);
+    }
+
     /** {@inheritDoc} */
     @Override public <R> QueryCursor<R> query(Query<R> qry) {
         return cache.query(qry);
diff --git 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java
 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java
index 12b44466..f31d4433 100644
--- 
a/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java
+++ 
b/modules/spring-session-ext/src/main/java/org/apache/ignite/spring/sessions/proxy/SessionProxy.java
@@ -23,6 +23,7 @@ import 
javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryListener;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.integration.CacheWriter;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.spring.sessions.IgniteSession;
@@ -108,6 +109,14 @@ public interface SessionProxy {
      */
     public boolean replace(String key, IgniteSession val);
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * For {@link CacheAtomicityMode#ATOMIC} return
+     * value on primary node crash may be incorrect because of the automatic 
retries.
+     */
+    public boolean replace(String key, IgniteSession oldVal, IgniteSession 
newVal);
+
     /**
      * Execute SQL query and get cursor to iterate over results.
      *

Reply via email to