This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch unomi-3-dev
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-3-dev by this push:
new b4a8b4f6a Reduce log noise
b4a8b4f6a is described below
commit b4a8b4f6acb49e2a312c9ad3bdd5a60cb4356fdd
Author: Serge Huber <[email protected]>
AuthorDate: Sun Dec 7 09:48:58 2025 +0100
Reduce log noise
---
.../org/apache/unomi/api/utils/ParserHelper.java | 13 +-
.../org/apache/unomi/itests/tools/LogChecker.java | 30 ++++
.../cache/AbstractMultiTypeCachingService.java | 10 +-
.../impl/definitions/DefinitionsServiceImpl.java | 12 +-
.../impl/scheduler/SchedulerServiceImpl.java | 4 +-
.../validation/ConditionValidationServiceImpl.java | 2 +-
.../impl/InMemoryPersistenceServiceImpl.java | 171 +++++++++++----------
7 files changed, 140 insertions(+), 102 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/utils/ParserHelper.java
b/api/src/main/java/org/apache/unomi/api/utils/ParserHelper.java
index 75d906cae..e31e2eaba 100644
--- a/api/src/main/java/org/apache/unomi/api/utils/ParserHelper.java
+++ b/api/src/main/java/org/apache/unomi/api/utils/ParserHelper.java
@@ -43,6 +43,8 @@ public class ParserHelper {
private static final Set<String> unresolvedActionTypes = new HashSet<>();
private static final Set<String> unresolvedConditionTypes = new
HashSet<>();
+ // Track rules that have already been warned about null/empty actions to
avoid log spam
+ private static final Set<String> warnedRulesWithNullActions =
Collections.synchronizedSet(new HashSet<>());
private static final String VALUE_NAME_SEPARATOR = "::";
private static final String PLACEHOLDER_PREFIX = "${";
@@ -187,15 +189,22 @@ public class ParserHelper {
public static boolean resolveActionTypes(DefinitionsService
definitionsService, Rule rule, boolean ignoreErrors) {
boolean result = true;
+ String ruleId = rule.getItemId();
if (rule.getActions() == null) {
if (!ignoreErrors) {
- LOGGER.warn("Rule {}:{} has null actions", rule.getItemId(),
rule.getMetadata().getName());
+ // Only warn once per rule to avoid log spam
+ if (warnedRulesWithNullActions.add(ruleId)) {
+ LOGGER.warn("Rule {}:{} has null actions", ruleId,
rule.getMetadata().getName());
+ }
}
return false;
}
if (rule.getActions().isEmpty()) {
if (!ignoreErrors) {
- LOGGER.warn("Rule {}:{} has empty actions", rule.getItemId(),
rule.getMetadata().getName());
+ // Only warn once per rule to avoid log spam
+ if (warnedRulesWithNullActions.add(ruleId)) {
+ LOGGER.warn("Rule {}:{} has empty actions", ruleId,
rule.getMetadata().getName());
+ }
}
return false;
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/tools/LogChecker.java
b/itests/src/test/java/org/apache/unomi/itests/tools/LogChecker.java
index b00c6b485..bb3f528b2 100644
--- a/itests/src/test/java/org/apache/unomi/itests/tools/LogChecker.java
+++ b/itests/src/test/java/org/apache/unomi/itests/tools/LogChecker.java
@@ -309,6 +309,10 @@ public class LogChecker {
addIgnoredPattern("InvalidRequestExceptionMapper.*events collector
cannot be empty");
addIgnoredPattern("InvalidRequestExceptionMapper.*Unable to
deserialize object because");
addIgnoredPattern("InvalidRequestExceptionMapper.*Incoming POST
request blocked because exceeding maximum bytes size");
+ // RequestValidatorInterceptor warnings (expected when testing request
size limits)
+ addIgnoredPattern("RequestValidatorInterceptor.*Incoming POST request
blocked because exceeding maximum bytes size");
+ addIgnoredPattern("RequestValidatorInterceptor.*has thrown exception,
unwinding now");
+ addIgnoredPattern("InvalidRequestException.*Incoming POST request
blocked because exceeding maximum bytes size");
// Test-related schema errors (expected in JSONSchemaIT and other
tests)
addIgnoredPattern("Schema not found for event type: dummy");
@@ -317,6 +321,24 @@ public class LogChecker {
addIgnoredPattern("Error executing system
operation:.*ValidationException.*Schema not found");
addIgnoredPattern("Couldn't find persona");
addIgnoredPattern("Unable to save schema");
+ addIgnoredPattern("SchemaServiceImpl.*Couldn't find schema");
+ addIgnoredPattern("Failed to load json schema!");
+ addIgnoredPattern("Couldn't find schema.*vendor.test.com");
+ addIgnoredPattern("JsonSchemaException.*Couldn't find schema");
+ addIgnoredPattern("InvocationTargetException.*JsonSchemaException");
+ addIgnoredPattern("IOException.*Couldn't find schema");
+ // Schema validation warnings (expected during schema validation tests)
+ addIgnoredPattern("SchemaServiceImpl.*Schema validation found.*errors
while validating");
+ addIgnoredPattern("SchemaServiceImpl.*Validation error.*does not match
the regex pattern");
+ addIgnoredPattern("SchemaServiceImpl.*An error occurred during the
validation of your event");
+ addIgnoredPattern("SchemaServiceImpl.*Validation error: There are
unevaluated properties");
+ addIgnoredPattern("SchemaServiceImpl.*Validation error: Unknown scope
value");
+ addIgnoredPattern("SchemaServiceImpl.*Validation error:.*may only have
a maximum of.*properties");
+ addIgnoredPattern("SchemaServiceImpl.*Validation error:.*string found,
number expected");
+
+ // Action type resolution warnings (expected in tests with missing
action types)
+ addIgnoredPattern("ParserHelper.*Couldn't resolve action type");
+ addIgnoredPattern("ResolverServiceImpl.*Marked rules.*as invalid:
Unresolved action type");
// Test-related property copy errors (expected in
CopyPropertiesActionIT)
addIgnoredPattern("Impossible to copy the property");
@@ -333,6 +355,14 @@ public class LogChecker {
addIgnoredPattern("Error while executing in class
loader.*scrollIdentifier=dummyScrollId");
addIgnoredPattern("Error while executing in class loader.*Error
loading itemType");
addIgnoredPattern("Error while executing in class loader.*Error
continuing scrolling query");
+ addIgnoredPattern("Error continuing scrolling
query.*scrollIdentifier=dummyScrollId");
+ addIgnoredPattern("Error continuing scrolling query for
itemType.*scrollIdentifier=dummyScrollId");
+ addIgnoredPattern("Cannot parse scroll id");
+ addIgnoredPattern("Request failed:.*illegal_argument_exception.*Cannot
parse scroll id");
+
+ // Index and mapping errors (expected during test setup/teardown or
when testing mapping scenarios)
+ addIgnoredPattern("Could not find index.*could not register item
type");
+ addIgnoredPattern("mapper_parsing_exception.*tried to parse field.*as
object, but found a concrete value");
// Condition validation errors (expected in tests with invalid
conditions)
addIgnoredPattern("Failed to validate condition");
diff --git
a/services-common/src/main/java/org/apache/unomi/services/common/cache/AbstractMultiTypeCachingService.java
b/services-common/src/main/java/org/apache/unomi/services/common/cache/AbstractMultiTypeCachingService.java
index 9d8ea9c06..2969451dd 100644
---
a/services-common/src/main/java/org/apache/unomi/services/common/cache/AbstractMultiTypeCachingService.java
+++
b/services-common/src/main/java/org/apache/unomi/services/common/cache/AbstractMultiTypeCachingService.java
@@ -16,11 +16,7 @@
*/
package org.apache.unomi.services.common.cache;
-import org.apache.unomi.api.Item;
-import org.apache.unomi.api.Metadata;
-import org.apache.unomi.api.MetadataItem;
-import org.apache.unomi.api.Parameter;
-import org.apache.unomi.api.PluginType;
+import org.apache.unomi.api.*;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.SchedulerService;
@@ -127,7 +123,7 @@ public abstract class AbstractMultiTypeCachingService
extends AbstractContextAwa
initializeTimers();
- logger.info("{} service initialized.", getClass().getSimpleName());
+ logger.debug("{} service initialized.", getClass().getSimpleName());
}
/**
@@ -204,7 +200,7 @@ public abstract class AbstractMultiTypeCachingService
extends AbstractContextAwa
.schedule();
scheduledRefreshTasks.put(taskName, scheduledTask);
- logger.info("Scheduled cache refresh for type: {}",
config.getType().getSimpleName());
+ logger.debug("Scheduled cache refresh for type: {}",
config.getType().getSimpleName());
}
protected void shutdownTimers() {
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
index 4a34f05af..568dfc4ac 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
@@ -18,21 +18,21 @@
package org.apache.unomi.services.impl.definitions;
import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.PluginType;
import org.apache.unomi.api.PropertyMergeStrategyType;
import org.apache.unomi.api.ValueType;
import org.apache.unomi.api.actions.ActionType;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.conditions.ConditionType;
-import org.apache.unomi.api.PluginType;
import org.apache.unomi.api.services.ConditionValidationService;
import
org.apache.unomi.api.services.ConditionValidationService.ValidationError;
import
org.apache.unomi.api.services.ConditionValidationService.ValidationErrorType;
import org.apache.unomi.api.services.DefinitionsService;
import org.apache.unomi.api.services.TenantLifecycleListener;
-import org.apache.unomi.api.utils.ParserHelper;
import org.apache.unomi.api.services.cache.CacheableTypeConfig;
import org.apache.unomi.api.services.cache.MultiTypeCacheService;
import org.apache.unomi.api.utils.ConditionBuilder;
+import org.apache.unomi.api.utils.ParserHelper;
import org.apache.unomi.services.common.cache.AbstractMultiTypeCachingService;
import org.apache.unomi.tracing.api.RequestTracer;
import org.apache.unomi.tracing.api.TracerService;
@@ -104,7 +104,7 @@ public class DefinitionsServiceImpl extends
AbstractMultiTypeCachingService impl
return null;
});
- LOGGER.info("Definitions service initialized.");
+ LOGGER.debug("Definitions service initialized.");
}
/**
@@ -218,7 +218,7 @@ public class DefinitionsServiceImpl extends
AbstractMultiTypeCachingService impl
@Override
public ConditionType getConditionType(String id) {
ConditionType conditionType = getItem(id, ConditionType.class);
- if (conditionType != null && conditionType.getParentCondition() !=
null
+ if (conditionType != null && conditionType.getParentCondition() != null
&& conditionType.getParentCondition().getConditionType() ==
null) {
// Resolve parent condition on demand if not already resolved
boolean resolved = ParserHelper.resolveConditionType(this,
conditionType.getParentCondition(),
@@ -660,7 +660,7 @@ public class DefinitionsServiceImpl extends
AbstractMultiTypeCachingService impl
BiConsumer<BundleContext, ConditionType> conditionTypeProcessor =
(bundleContext, type) -> {
type.setPluginId(bundleContext.getBundle().getBundleId());
type.setTenantId(SYSTEM_TENANT);
-
+
// Try to resolve parent condition if present, but don't fail if
parent isn't loaded yet
// The post-refresh callback will resolve it later once all items
are loaded
if (type.getParentCondition() != null &&
type.getParentCondition().getConditionType() == null) {
@@ -671,7 +671,7 @@ public class DefinitionsServiceImpl extends
AbstractMultiTypeCachingService impl
type.getItemId());
}
}
-
+
setConditionType(type);
};
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
index 52b7cf834..d11167eb9 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
@@ -22,9 +22,9 @@ import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.api.tasks.ScheduledTask;
import org.apache.unomi.api.tasks.ScheduledTask.TaskStatus;
import org.apache.unomi.api.tasks.TaskExecutor;
+import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.osgi.framework.BundleContext;
import java.time.Duration;
import java.time.ZonedDateTime;
@@ -595,7 +595,7 @@ public class SchedulerServiceImpl implements
SchedulerService {
public void setPersistenceProvider(SchedulerProvider persistenceProvider) {
this.persistenceProvider = persistenceProvider;
- LOGGER.info("PersistenceSchedulerProvider bound to SchedulerService");
+ LOGGER.debug("PersistenceSchedulerProvider bound to SchedulerService");
// Clear any expired operations first
clearExpiredOperations();
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/validation/ConditionValidationServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/validation/ConditionValidationServiceImpl.java
index ef5a930ae..50bbefd2b 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/validation/ConditionValidationServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/validation/ConditionValidationServiceImpl.java
@@ -41,7 +41,7 @@ public class ConditionValidationServiceImpl implements
ConditionValidationServic
for (ValueTypeValidator validator : builtInValidators) {
validators.put(validator.getValueTypeId().toLowerCase(),
validator);
}
- LOGGER.info("Initialized with {} built-in validators",
builtInValidators.size());
+ LOGGER.debug("Initialized with {} built-in validators",
builtInValidators.size());
}
public void bindValidator(ValueTypeValidator validator) {
diff --git
a/services/src/test/java/org/apache/unomi/services/impl/InMemoryPersistenceServiceImpl.java
b/services/src/test/java/org/apache/unomi/services/impl/InMemoryPersistenceServiceImpl.java
index d51933732..c0c4401bc 100644
---
a/services/src/test/java/org/apache/unomi/services/impl/InMemoryPersistenceServiceImpl.java
+++
b/services/src/test/java/org/apache/unomi/services/impl/InMemoryPersistenceServiceImpl.java
@@ -45,8 +45,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.Arrays;
-import java.util.Collection;
/**
* An in-memory implementation of PersistenceService for testing purposes.
@@ -56,7 +54,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryPersistenceServiceImpl.class);
private static final Pattern SAFE_FILENAME_PATTERN =
Pattern.compile("[^a-zA-Z0-9-_.]");
public static final String DEFAULT_STORAGE_DIR = "data/persistence";
-
+
// System items list - matches Elasticsearch/OpenSearch persistence
services
// System items have their itemType appended to the document ID:
tenantId_itemId_itemType
private static final Collection<String> systemItems =
Arrays.asList("actionType", "campaign", "campaignevent", "goal", "userList",
@@ -84,20 +82,20 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
private final Set<String> refreshedIndexes =
ConcurrentHashMap.newKeySet(); // indexes that have been refreshed
private Thread refreshThread;
private volatile boolean shutdownRefreshThread = false;
-
+
// Refresh policy per item type (simulates Elasticsearch/OpenSearch
refresh policies)
// Valid values: False/NONE (default - wait for automatic refresh),
True/IMMEDIATE (immediate refresh), WaitFor/WAIT_UNTIL (wait for refresh)
private final Map<String, RefreshPolicy> itemTypeToRefreshPolicy = new
ConcurrentHashMap<>();
-
+
// Default query limit (simulates Elasticsearch/OpenSearch default query
limit)
private Integer defaultQueryLimit = 10;
-
+
// Tenant transformation listeners (simulates Elasticsearch/OpenSearch
tenant transformations)
private final
List<org.apache.unomi.api.tenants.TenantTransformationListener>
transformationListeners = new ArrayList<>();
-
+
/**
* Refresh policy enum that simulates Elasticsearch/OpenSearch refresh
behavior.
- *
+ *
* - FALSE/NONE: Don't refresh immediately, wait for automatic refresh
(default behavior)
* - TRUE/IMMEDIATE: Force an immediate refresh after indexing
* - WAIT_FOR/WAIT_UNTIL: Wait for refresh to complete before returning
(similar to True but with different semantics)
@@ -108,13 +106,13 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
* This is the default and most efficient option.
*/
FALSE,
-
+
/**
* TRUE/IMMEDIATE - Force an immediate refresh after indexing.
* Changes are immediately visible but more resource-intensive.
*/
TRUE,
-
+
/**
* WAIT_FOR/WAIT_UNTIL - Wait for refresh to complete before returning.
* Similar to TRUE but ensures the refresh operation completes before
the request returns.
@@ -157,7 +155,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
String storageDir, boolean enableFileStorage, boolean
clearStorageOnInit, boolean prettyPrintJson, boolean simulateRefreshDelay, long
refreshIntervalMs) {
this(executionContextManager, conditionEvaluatorDispatcher,
storageDir, enableFileStorage, clearStorageOnInit, prettyPrintJson,
simulateRefreshDelay, refreshIntervalMs, 10);
}
-
+
public InMemoryPersistenceServiceImpl(ExecutionContextManager
executionContextManager, ConditionEvaluatorDispatcher
conditionEvaluatorDispatcher,
String storageDir, boolean enableFileStorage, boolean
clearStorageOnInit, boolean prettyPrintJson, boolean simulateRefreshDelay, long
refreshIntervalMs, int defaultQueryLimit) {
this.executionContextManager = executionContextManager;
@@ -194,20 +192,25 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
Files.createDirectories(storageRootPath);
loadPersistedItems();
} catch (IOException e) {
- LOGGER.error("Failed to create or access storage directory:
{}", storageRootPath, e);
+ // For AccessDeniedException (common in tests), log without
stack trace to reduce noise
+ if (e instanceof java.nio.file.AccessDeniedException) {
+ LOGGER.error("Failed to create or access storage
directory: {} - {}", storageRootPath, e.getMessage());
+ } else {
+ LOGGER.error("Failed to create or access storage
directory: {}", storageRootPath, e);
+ }
throw new RuntimeException("Failed to initialize storage", e);
}
} else {
this.objectMapper = null;
this.storageRootPath = null;
}
-
+
// Start background refresh thread if refresh delay simulation is
enabled
if (simulateRefreshDelay) {
startRefreshThread();
}
}
-
+
/**
* Starts the background thread that periodically refreshes indexes,
simulating Elasticsearch/OpenSearch behavior.
* By default, Elasticsearch refreshes indexes every 1 second, making
newly indexed documents searchable.
@@ -229,7 +232,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
refreshThread.setDaemon(true);
refreshThread.start();
}
-
+
/**
* Performs periodic refresh of pending items, making them available for
querying.
* This simulates Elasticsearch's automatic refresh behavior.
@@ -237,13 +240,13 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
private void performPeriodicRefresh() {
long currentTime = System.currentTimeMillis();
Set<String> itemsToRefresh = new HashSet<>();
-
+
for (Map.Entry<String, Long> entry : pendingRefreshItems.entrySet()) {
if (entry.getValue() <= currentTime) {
itemsToRefresh.add(entry.getKey());
}
}
-
+
if (!itemsToRefresh.isEmpty()) {
for (String itemKey : itemsToRefresh) {
pendingRefreshItems.remove(itemKey);
@@ -256,7 +259,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
LOGGER.debug("Periodically refreshed {} items",
itemsToRefresh.size());
}
}
-
+
/**
* Shuts down the refresh thread. Should be called when the service is
being destroyed.
*/
@@ -345,12 +348,12 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
Path itemPath = getItemPath(item);
String pathKey = itemPath.toString();
Object lock = fileLocks.computeIfAbsent(pathKey, k -> new Object());
-
+
synchronized (lock) {
// Retry logic for handling transient file system issues in
concurrent scenarios
int maxRetries = 3;
int retryCount = 0;
-
+
while (retryCount < maxRetries) {
try {
// Create parent directories, handling race conditions
where directory might already exist
@@ -381,7 +384,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
}
}
-
+
String json = objectMapper.writeValueAsString(item);
Files.writeString(itemPath, json, StandardCharsets.UTF_8);
// Success - break out of retry loop
@@ -423,7 +426,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
Path itemPath = getItemPath(item);
String pathKey = itemPath.toString();
Object lock = fileLocks.computeIfAbsent(pathKey, k -> new Object());
-
+
synchronized (lock) {
try {
Files.deleteIfExists(itemPath);
@@ -489,12 +492,12 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
private <T extends Item> String getKey(String itemId, String index) {
return index + ":" + itemId + ":" +
executionContextManager.getCurrentContext().getTenantId();
}
-
+
/**
* Get the document ID for an item type, matching Elasticsearch/OpenSearch
format.
* For system items, the format is: tenantId_itemId_itemType
* For non-system items, the format is: tenantId_itemId
- *
+ *
* @param itemId the item ID
* @param itemType the item type
* @return the document ID
@@ -504,10 +507,10 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
String baseId = systemItems.contains(itemType) ? (itemId + "_" +
itemType.toLowerCase()) : itemId;
return tenantId + "_" + baseId;
}
-
+
/**
* Strip tenant prefix from document ID, matching Elasticsearch/OpenSearch
format.
- *
+ *
* @param documentId the document ID
* @return the document ID without tenant prefix
*/
@@ -526,12 +529,12 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
return documentId;
}
-
+
/**
* Extract itemId from document ID for system items, matching
Elasticsearch/OpenSearch behavior.
* For system items, document ID format is: tenantId_itemId_itemType
* This method removes the tenant prefix and itemType suffix to get the
actual itemId.
- *
+ *
* @param documentId the document ID (may include tenant prefix)
* @param itemType the item type
* @return the extracted itemId
@@ -604,10 +607,10 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
item.setTenantId(executionContextManager.getCurrentContext().getTenantId());
-
+
// Apply tenant transformations before save (simulates
Elasticsearch/OpenSearch behavior)
item = handleItemTransformation(item);
-
+
String indexName = getIndexName(item);
String key = getKey(item.getItemId(), indexName);
Item existingItem = itemsById.get(key);
@@ -681,7 +684,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
}
RefreshPolicy refreshPolicy = getRefreshPolicy(itemType, item);
-
+
switch (refreshPolicy) {
case TRUE:
case WAIT_FOR:
@@ -689,7 +692,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// For WAIT_FOR, we also ensure refresh completes (same
behavior as TRUE in in-memory)
pendingRefreshItems.remove(key);
refreshedIndexes.add(indexName);
- LOGGER.debug("Immediately refreshed item {} of type {} due
to refresh policy {}",
+ LOGGER.debug("Immediately refreshed item {} of type {} due
to refresh policy {}",
item.getItemId(), itemType, refreshPolicy);
break;
case FALSE:
@@ -705,12 +708,12 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
return true;
}
-
+
/**
* Gets the refresh policy for a given item type.
* Checks for request-based override in item's system metadata first,
* then falls back to per-item-type policy, and finally defaults to FALSE.
- *
+ *
* @param itemType the item type
* @param item the item (may be null) - used to check for request-based
override
* @return the refresh policy for this item type
@@ -733,26 +736,26 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
}
}
-
+
// Fall back to per-item-type policy
return itemTypeToRefreshPolicy.getOrDefault(itemType,
RefreshPolicy.FALSE);
}
-
+
/**
* Gets the refresh policy for a given item type (without item context).
* Used when item is not available.
- *
+ *
* @param itemType the item type
* @return the refresh policy for this item type
*/
private RefreshPolicy getRefreshPolicy(String itemType) {
return itemTypeToRefreshPolicy.getOrDefault(itemType,
RefreshPolicy.FALSE);
}
-
+
/**
* Sets the refresh policy for a specific item type.
* This simulates Elasticsearch/OpenSearch's itemTypeToRefreshPolicy
configuration.
- *
+ *
* @param itemType the item type
* @param refreshPolicy the refresh policy (FALSE, TRUE, or WAIT_FOR)
*/
@@ -762,18 +765,18 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
LOGGER.debug("Set refresh policy for item type {} to {}",
itemType, refreshPolicy);
}
}
-
+
/**
* Sets refresh policies from a JSON string, matching
Elasticsearch/OpenSearch configuration format.
* Example: {"event":"WAIT_FOR","rule":"FALSE","scheduledTask":"TRUE"}
- *
+ *
* @param refreshPolicyJson JSON string mapping item types to refresh
policies
*/
public void setItemTypeToRefreshPolicy(String refreshPolicyJson) {
if (refreshPolicyJson == null || refreshPolicyJson.trim().isEmpty()) {
return;
}
-
+
try {
if (objectMapper != null) {
@SuppressWarnings("unchecked")
@@ -795,11 +798,11 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
LOGGER.error("Failed to parse refresh policy JSON: {}",
refreshPolicyJson, e);
}
}
-
+
/**
* Parses refresh policy string values from Elasticsearch/OpenSearch
configuration.
* Supports: False/NONE, True/IMMEDIATE, WaitFor/WAIT_UNTIL
- *
+ *
* @param policyStr the policy string
* @return the corresponding RefreshPolicy enum value
*/
@@ -807,7 +810,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
if (policyStr == null) {
return RefreshPolicy.FALSE;
}
-
+
String upper = policyStr.toUpperCase();
if ("FALSE".equals(upper) || "NONE".equals(upper)) {
return RefreshPolicy.FALSE;
@@ -816,15 +819,15 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
} else if ("WAITFOR".equals(upper) || "WAIT_FOR".equals(upper) ||
"WAIT_UNTIL".equals(upper)) {
return RefreshPolicy.WAIT_FOR;
}
-
+
LOGGER.warn("Unknown refresh policy value: {}, defaulting to FALSE",
policyStr);
return RefreshPolicy.FALSE;
}
-
+
/**
* Checks if an item is available for querying (i.e., has been refreshed).
* In Elasticsearch/OpenSearch, items are not immediately available after
indexing.
- *
+ *
* @param itemKey the item key (format: "index:itemId:tenantId")
* @param indexName the index name
* @return true if the item is available for querying, false otherwise
@@ -833,19 +836,19 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
if (!simulateRefreshDelay) {
return true; // If refresh delay is disabled, all items are
immediately available
}
-
+
// If the index has been explicitly refreshed, all items in it are
available
if (refreshedIndexes.contains(indexName)) {
return true;
}
-
+
// Check if this specific item has passed its refresh time
Long refreshTime = pendingRefreshItems.get(itemKey);
if (refreshTime == null) {
// Item is not in pending list, so it's available (e.g., loaded
from file)
return true;
}
-
+
return System.currentTimeMillis() >= refreshTime;
}
@@ -900,10 +903,10 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// So we need to query all items, not just refreshed ones
String currentTenantId =
executionContextManager.getCurrentContext().getTenantId();
List<T> itemsToRemove = new ArrayList<>();
-
+
for (Map.Entry<String, Item> entry : itemsById.entrySet()) {
Item item = entry.getValue();
- if (clazz.isAssignableFrom(item.getClass()) &&
+ if (clazz.isAssignableFrom(item.getClass()) &&
currentTenantId.equals(item.getTenantId())) {
// Check condition if provided (but don't filter by refresh
status for delete operations)
if (condition == null || testMatch(condition, item)) {
@@ -911,7 +914,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
}
}
-
+
for (T item : itemsToRemove) {
remove(item.getItemId(), clazz);
}
@@ -1109,7 +1112,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// This simulates Elasticsearch's refresh() API which forces an
immediate refresh
int itemsRefreshed = pendingRefreshItems.size();
Set<String> indexesToRefresh = new HashSet<>();
-
+
for (Map.Entry<String, Long> entry :
pendingRefreshItems.entrySet()) {
String itemKey = entry.getKey();
// Extract index name from itemKey (format:
"index:itemId:tenantId")
@@ -1118,10 +1121,10 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
indexesToRefresh.add(parts[0]);
}
}
-
+
// Clear all pending items
pendingRefreshItems.clear();
-
+
// Mark all indexes as refreshed
refreshedIndexes.addAll(indexesToRefresh);
LOGGER.debug("Manually refreshed all indexes, made {} items
immediately available", itemsRefreshed);
@@ -1202,7 +1205,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// This simulates Elasticsearch's refreshIndex() API which forces
an immediate refresh of a specific index
String indexName = getIndex(clazz);
int itemsRefreshed = 0;
-
+
// Remove all pending items for this index from the pending list
Set<String> keysToRemove = new HashSet<>();
for (Map.Entry<String, Long> entry :
pendingRefreshItems.entrySet()) {
@@ -1214,18 +1217,18 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
itemsRefreshed++;
}
}
-
+
for (String key : keysToRemove) {
pendingRefreshItems.remove(key);
}
-
+
// Mark this index as refreshed
refreshedIndexes.add(indexName);
- LOGGER.debug("Manually refreshed index {} for class {} with date
hint {}, made {} items immediately available",
+ LOGGER.debug("Manually refreshed index {} for class {} with date
hint {}, made {} items immediately available",
indexName, clazz.getName(), dateHint, itemsRefreshed);
} else {
if (clazz != null) {
- LOGGER.debug("RefreshIndex called for class {} with date hint
{} (refresh delay simulation disabled)",
+ LOGGER.debug("RefreshIndex called for class {} with date hint
{} (refresh delay simulation disabled)",
clazz.getName(), dateHint);
}
}
@@ -1484,7 +1487,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
if (fileStorageEnabled) {
persistItem(existingItem);
}
-
+
// Handle refresh policy per item type for updates (same as save)
// Request-based override (via system metadata) takes precedence
over per-item-type policy
if (simulateRefreshDelay) {
@@ -1497,14 +1500,14 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
String indexName = getIndexName(existingItem);
RefreshPolicy refreshPolicy = getRefreshPolicy(itemType,
existingItem);
-
+
switch (refreshPolicy) {
case TRUE:
case WAIT_FOR:
// Immediate refresh: make item available immediately
pendingRefreshItems.remove(key);
refreshedIndexes.add(indexName);
- LOGGER.debug("Immediately refreshed updated item {} of
type {} due to refresh policy {}",
+ LOGGER.debug("Immediately refreshed updated item {} of
type {} due to refresh policy {}",
existingItem.getItemId(), itemType,
refreshPolicy);
break;
case FALSE:
@@ -1516,7 +1519,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
break;
}
}
-
+
return true;
} catch (Exception e) {
LOGGER.error("Error updating item", e);
@@ -1650,11 +1653,11 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
@SuppressWarnings({"unchecked", "rawtypes"})
List<Item> items = new ArrayList<>();
String currentTenantId =
executionContextManager.getCurrentContext().getTenantId();
-
+
// Query all items directly from itemsById, not filtering by
refresh status
for (Map.Entry<String, Item> entry : itemsById.entrySet()) {
Item item = entry.getValue();
- if (clazz.isAssignableFrom(item.getClass()) &&
+ if (clazz.isAssignableFrom(item.getClass()) &&
currentTenantId.equals(item.getTenantId())) {
// Check condition if provided (but don't filter by
refresh status)
if (conditions[i] == null || testMatch(conditions[i],
item)) {
@@ -1662,7 +1665,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
}
}
-
+
for (Item item : items) {
success &= updateWithScript(item, dateHint, clazz, scripts[i],
scriptParams[i]);
}
@@ -1706,7 +1709,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
List<CustomItem> customItems = new ArrayList<>();
for (Map.Entry<String, Item> entry : itemsById.entrySet()) {
Item item = entry.getValue();
- if (item instanceof CustomItem &&
customItemType.equals(item.getItemType())
+ if (item instanceof CustomItem &&
customItemType.equals(item.getItemType())
&& currentTenantId.equals(item.getTenantId())) {
String itemKey = entry.getKey();
if (isItemAvailableForQuery(itemKey, customItemType) &&
(condition == null || testMatch(condition, item))) {
@@ -2752,11 +2755,11 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
private String getIndex(Class<?> clazz) {
return Item.getItemType(clazz);
}
-
+
/**
* Applies tenant transformations to an item before save/update.
* This simulates Elasticsearch/OpenSearch tenant transformation behavior.
- *
+ *
* @param item the item to transform
* @param <T> the item type
* @return the transformed item (or original if no transformations applied)
@@ -2769,7 +2772,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// Sort listeners by priority (higher priority first)
List<org.apache.unomi.api.tenants.TenantTransformationListener> sortedListeners
= new ArrayList<>(transformationListeners);
sortedListeners.sort((a, b) ->
Integer.compare(b.getPriority(), a.getPriority()));
-
+
for (org.apache.unomi.api.tenants.TenantTransformationListener
listener : sortedListeners) {
if (listener.isTransformationEnabled()) {
try {
@@ -2788,11 +2791,11 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
return item;
}
-
+
/**
* Applies reverse tenant transformations to an item after load.
* This simulates Elasticsearch/OpenSearch tenant reverse transformation
behavior.
- *
+ *
* @param item the item to reverse transform
* @param <T> the item type
* @return the reverse transformed item (or original if no transformations
applied)
@@ -2805,7 +2808,7 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
// Sort listeners by priority (higher priority first) for
reverse transformation
List<org.apache.unomi.api.tenants.TenantTransformationListener> sortedListeners
= new ArrayList<>(transformationListeners);
sortedListeners.sort((a, b) ->
Integer.compare(b.getPriority(), a.getPriority()));
-
+
for (org.apache.unomi.api.tenants.TenantTransformationListener
listener : sortedListeners) {
if (listener.isTransformationEnabled()) {
try {
@@ -2824,11 +2827,11 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
}
return item;
}
-
+
/**
* Adds a tenant transformation listener (for testing purposes).
* This simulates OSGi service registration in Elasticsearch/OpenSearch
implementations.
- *
+ *
* @param listener the transformation listener to add
*/
public void
addTransformationListener(org.apache.unomi.api.tenants.TenantTransformationListener
listener) {
@@ -2837,10 +2840,10 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
LOGGER.debug("Added tenant transformation listener: {}",
listener.getTransformationType());
}
}
-
+
/**
* Removes a tenant transformation listener (for testing purposes).
- *
+ *
* @param listener the transformation listener to remove
*/
public void
removeTransformationListener(org.apache.unomi.api.tenants.TenantTransformationListener
listener) {
@@ -2849,19 +2852,19 @@ public class InMemoryPersistenceServiceImpl implements
PersistenceService {
LOGGER.debug("Removed tenant transformation listener: {}",
listener.getTransformationType());
}
}
-
+
/**
* Gets the default query limit.
- *
+ *
* @return the default query limit
*/
public Integer getDefaultQueryLimit() {
return defaultQueryLimit;
}
-
+
/**
* Sets the default query limit.
- *
+ *
* @param defaultQueryLimit the default query limit to set
*/
public void setDefaultQueryLimit(Integer defaultQueryLimit) {