This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 591b96914769ae82cf8c9dc09897bc395bbd3597
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jan 30 10:22:07 2024 +0100

    CAMEL-19749: variables - Should also copy message headers into variable 
when using EIP variables
---
 .../apache/camel/spi/StreamCachingStrategy.java    |  8 +++
 .../impl/engine/DefaultStreamCachingStrategy.java  | 18 ++++-
 .../camel/language/tokenizer/TokenizeLanguage.java |  3 +-
 ...sitory.java => AbstractVariableRepository.java} | 83 +++++++++++++---------
 .../camel/support/ExchangeVariableRepository.java  | 74 ++-----------------
 .../camel/support/GlobalVariableRepository.java    | 60 +---------------
 .../camel/support/builder/ExpressionBuilder.java   |  8 +--
 7 files changed, 85 insertions(+), 169 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index 68de83dd718..267949c9ba6 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -282,4 +282,12 @@ public interface StreamCachingStrategy extends 
StaticService {
      */
     StreamCache cache(Message message);
 
+    /**
+     * Caches the value aas a {@link StreamCache}.
+     *
+     * @param  value the value
+     * @return       the value cached as a {@link StreamCache}, or 
<tt>null</tt> if not possible or no need to cache
+     */
+    StreamCache cache(Object value);
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
index 4c0919f7b7e..92df0417bd8 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
@@ -249,21 +249,33 @@ public class DefaultStreamCachingStrategy extends 
ServiceSupport implements Came
 
     @Override
     public StreamCache cache(Exchange exchange) {
-        return cache(exchange.getMessage());
+        return doCache(exchange.getMessage().getBody(), exchange);
     }
 
     @Override
     public StreamCache cache(Message message) {
+        return doCache(message.getBody(), message.getExchange());
+    }
+
+    @Override
+    public StreamCache cache(Object body) {
+        return doCache(body, null);
+    }
+
+    private StreamCache doCache(Object body, Exchange exchange) {
         StreamCache cache = null;
         // try convert to stream cache
-        Object body = message.getBody();
         if (body != null) {
             boolean allowed = allowClasses == null && denyClasses == null;
             if (!allowed) {
                 allowed = checkAllowDenyList(body);
             }
             if (allowed) {
-                cache = 
camelContext.getTypeConverter().convertTo(StreamCache.class, 
message.getExchange(), body);
+                if (exchange != null) {
+                    cache = 
camelContext.getTypeConverter().convertTo(StreamCache.class, exchange, body);
+                } else {
+                    cache = 
camelContext.getTypeConverter().convertTo(StreamCache.class, body);
+                }
             }
         }
         if (cache != null) {
diff --git 
a/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java
 
b/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java
index 62b63391ba1..3662e1b0284 100644
--- 
a/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java
+++ 
b/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java
@@ -133,7 +133,8 @@ public class TokenizeLanguage extends 
SingleInputLanguageSupport implements Prop
 
         if (answer == null) {
             // use the regular tokenizer
-            final Expression exp = 
ExpressionBuilder.singleInputExpression(getVariableName(), getHeaderName(), 
getPropertyName());
+            final Expression exp
+                    = 
ExpressionBuilder.singleInputExpression(getVariableName(), getHeaderName(), 
getPropertyName());
             if (regex) {
                 answer = ExpressionBuilder.regexTokenizeExpression(exp, token);
             } else {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java
similarity index 58%
copy from 
core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
copy to 
core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java
index 2101982bf4a..ba6655b0541 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java
@@ -22,49 +22,35 @@ import java.util.stream.Stream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
 import org.apache.camel.StreamCache;
-import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.StreamCacheException;
 import org.apache.camel.spi.BrowsableVariableRepository;
-import org.apache.camel.spi.VariableRepository;
+import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.CaseInsensitiveMap;
-import org.apache.camel.util.StringHelper;
 
 /**
- * {@link VariableRepository} which is local per {@link Exchange} to hold 
request-scoped variables.
+ * Base class for {@link org.apache.camel.spi.VariableRepository} 
implementations that store variables in memory.
  */
-class ExchangeVariableRepository extends ServiceSupport implements 
BrowsableVariableRepository, NonManagedService {
+public abstract class AbstractVariableRepository extends ServiceSupport
+        implements BrowsableVariableRepository, CamelContextAware {
 
     private final Map<String, Object> variables = new ConcurrentHashMap<>(8);
-    private final CamelContext camelContext;
+    private CamelContext camelContext;
+    private StreamCachingStrategy strategy;
 
-    public ExchangeVariableRepository(CamelContext camelContext) {
-        this.camelContext = camelContext;
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
     }
 
     @Override
-    public String getId() {
-        return "exchange";
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
     }
 
     @Override
     public Object getVariable(String name) {
         Object answer = variables.get(name);
-        if (answer == null && name.endsWith(".headers")) {
-            String prefix = name.substring(0, name.length() - 1) + "."; // 
xxx.headers -> xxx.header.
-            // we want all headers for a given variable
-            Map<String, Object> map = new CaseInsensitiveMap();
-            for (Map.Entry<String, Object> entry : variables.entrySet()) {
-                String key = entry.getKey();
-                if (key.startsWith(prefix)) {
-                    key = StringHelper.after(key, prefix);
-                    map.put(key, entry.getValue());
-                }
-            }
-            return map;
-        }
         if (answer instanceof StreamCache sc) {
             // reset so the cache is ready to be used as a variable
             sc.reset();
@@ -74,15 +60,10 @@ class ExchangeVariableRepository extends ServiceSupport 
implements BrowsableVari
 
     @Override
     public void setVariable(String name, Object value) {
-        // special for some values that are CachedOutputStream which we want 
to be re-readable and therefore
-        // convert this to StreamCache
-        // TODO: Do something like StreamCachingHelper
-        // TODO: support base class that has stream caching stuff for 
set/getVariable
-        if (camelContext.isStreamCaching())
-            return StreamCachingHelper.convertToStreamCache(strategy, 
exchange, exchange.getIn());
-            Object cache = 
camelContext.getTypeConverter().tryConvertTo(StreamCache.class, value);
-            if (cache != null) {
-                value = cache;
+        if (value != null && strategy != null) {
+            StreamCache sc = convertToStreamCache(value);
+            if (sc != null) {
+                value = sc;
             }
         }
         if (value != null) {
@@ -125,4 +106,36 @@ class ExchangeVariableRepository extends ServiceSupport 
implements BrowsableVari
         }
         return variables.remove(name);
     }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        if (camelContext != null && camelContext.isStreamCaching()) {
+            strategy = camelContext.getStreamCachingStrategy();
+        }
+    }
+
+    protected StreamCache convertToStreamCache(Object body) {
+        // check if body is already cached
+        if (body == null) {
+            return null;
+        } else if (body instanceof StreamCache) {
+            StreamCache sc = (StreamCache) body;
+            // reset so the cache is ready to be used before processing
+            sc.reset();
+            return sc;
+        }
+        return tryStreamCache(body);
+    }
+
+    protected StreamCache tryStreamCache(Object body) {
+        try {
+            // cache the body and if we could do that replace it as the new 
body
+            return strategy.cache(body);
+        } catch (Exception e) {
+            throw new StreamCacheException(body, e);
+        }
+    }
+
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
index 2101982bf4a..1d75855542f 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java
@@ -17,31 +17,24 @@
 package org.apache.camel.support;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
 import org.apache.camel.StreamCache;
-import org.apache.camel.converter.stream.CachedOutputStream;
-import org.apache.camel.spi.BrowsableVariableRepository;
 import org.apache.camel.spi.VariableRepository;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CaseInsensitiveMap;
 import org.apache.camel.util.StringHelper;
 
 /**
  * {@link VariableRepository} which is local per {@link Exchange} to hold 
request-scoped variables.
  */
-class ExchangeVariableRepository extends ServiceSupport implements 
BrowsableVariableRepository, NonManagedService {
-
-    private final Map<String, Object> variables = new ConcurrentHashMap<>(8);
-    private final CamelContext camelContext;
+final class ExchangeVariableRepository extends AbstractVariableRepository {
 
     public ExchangeVariableRepository(CamelContext camelContext) {
-        this.camelContext = camelContext;
+        setCamelContext(camelContext);
+        // ensure its started
+        ServiceHelper.startService(this);
     }
 
     @Override
@@ -51,12 +44,12 @@ class ExchangeVariableRepository extends ServiceSupport 
implements BrowsableVari
 
     @Override
     public Object getVariable(String name) {
-        Object answer = variables.get(name);
+        Object answer = super.getVariable(name);
         if (answer == null && name.endsWith(".headers")) {
             String prefix = name.substring(0, name.length() - 1) + "."; // 
xxx.headers -> xxx.header.
             // we want all headers for a given variable
             Map<String, Object> map = new CaseInsensitiveMap();
-            for (Map.Entry<String, Object> entry : variables.entrySet()) {
+            for (Map.Entry<String, Object> entry : getVariables().entrySet()) {
                 String key = entry.getKey();
                 if (key.startsWith(prefix)) {
                     key = StringHelper.after(key, prefix);
@@ -72,57 +65,4 @@ class ExchangeVariableRepository extends ServiceSupport 
implements BrowsableVari
         return answer;
     }
 
-    @Override
-    public void setVariable(String name, Object value) {
-        // special for some values that are CachedOutputStream which we want 
to be re-readable and therefore
-        // convert this to StreamCache
-        // TODO: Do something like StreamCachingHelper
-        // TODO: support base class that has stream caching stuff for 
set/getVariable
-        if (camelContext.isStreamCaching())
-            return StreamCachingHelper.convertToStreamCache(strategy, 
exchange, exchange.getIn());
-            Object cache = 
camelContext.getTypeConverter().tryConvertTo(StreamCache.class, value);
-            if (cache != null) {
-                value = cache;
-            }
-        }
-        if (value != null) {
-            // avoid the NullPointException
-            variables.put(name, value);
-        } else {
-            // if the value is null, we just remove the key from the map
-            variables.remove(name);
-        }
-    }
-
-    public boolean hasVariables() {
-        return !variables.isEmpty();
-    }
-
-    public int size() {
-        return variables.size();
-    }
-
-    public Stream<String> names() {
-        return variables.keySet().stream();
-    }
-
-    public Map<String, Object> getVariables() {
-        return variables;
-    }
-
-    public void setVariables(Map<String, Object> map) {
-        variables.putAll(map);
-    }
-
-    public void clear() {
-        variables.clear();
-    }
-
-    @Override
-    public Object removeVariable(String name) {
-        if (!hasVariables()) {
-            return null;
-        }
-        return variables.remove(name);
-    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java
index 09f1e7d7270..cf7abcd1fd8 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java
@@ -17,75 +17,17 @@
 package org.apache.camel.support;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Stream;
 
-import org.apache.camel.StreamCache;
-import org.apache.camel.spi.BrowsableVariableRepository;
 import org.apache.camel.spi.VariableRepository;
-import org.apache.camel.support.service.ServiceSupport;
 
 /**
  * Global {@link VariableRepository} which stores variables in-memory in a 
{@link Map}.
  */
-public final class GlobalVariableRepository extends ServiceSupport implements 
BrowsableVariableRepository {
-
-    private final ConcurrentMap<String, Object> variables = new 
ConcurrentHashMap<>();
+public final class GlobalVariableRepository extends AbstractVariableRepository 
{
 
     @Override
     public String getId() {
         return "global";
     }
 
-    @Override
-    public Object getVariable(String name) {
-        Object answer = variables.get(name);
-        if (answer instanceof StreamCache sc) {
-            // reset so the cache is ready to be used as a variable
-            sc.reset();
-        }
-        return answer;
-    }
-
-    @Override
-    public void setVariable(String name, Object value) {
-        if (value != null) {
-            // avoid the NullPointException
-            variables.put(name, value);
-        } else {
-            // if the value is null, we just remove the key from the map
-            variables.remove(name);
-        }
-    }
-
-    @Override
-    public Object removeVariable(String name) {
-        return variables.remove(name);
-    }
-
-    @Override
-    public boolean hasVariables() {
-        return !variables.isEmpty();
-    }
-
-    @Override
-    public int size() {
-        return variables.size();
-    }
-
-    @Override
-    public Stream<String> names() {
-        return variables.keySet().stream();
-    }
-
-    @Override
-    public Map<String, Object> getVariables() {
-        return variables;
-    }
-
-    @Override
-    public void clear() {
-        variables.clear();
-    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java
index 4853d3eafe7..6ea8d094b0b 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java
@@ -1142,13 +1142,13 @@ public class ExpressionBuilder {
     }
 
     /**
-     * @param variableName  the name of the variable from which the input data 
must be extracted if not empty.
+     * @param  variableName the name of the variable from which the input data 
must be extracted if not empty.
      * @param  headerName   the name of the header from which the input data 
must be extracted if not empty.
      * @param  propertyName the name of the property from which the input data 
must be extracted if not empty and
      *                      {@code headerName} is empty.
-     * @return              a variable expression if {@code variableName} is 
not empty,
-     *                      a header expression if {@code headerName} is not 
empty, otherwise a property expression if
-     *                      {@code propertyName} is not empty or finally a 
body expression.
+     * @return              a variable expression if {@code variableName} is 
not empty, a header expression if
+     *                      {@code headerName} is not empty, otherwise a 
property expression if {@code propertyName} is
+     *                      not empty or finally a body expression.
      */
     public static Expression singleInputExpression(String variableName, String 
headerName, String propertyName) {
         final Expression exp;

Reply via email to