This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new a6999de Unomi - 387: Stop Unomi in case of an error which its cannot
recovery from (#186)
a6999de is described below
commit a6999de528eafccd43076c622fca74d3807877db
Author: liatiusim <[email protected]>
AuthorDate: Tue Oct 13 18:56:44 2020 +0300
Unomi - 387: Stop Unomi in case of an error which its cannot recovery from
(#186)
* Stop Unomi if reactor stopped
* Use system bundle stop instead of system exit, define fatal errors in
configuration
Co-authored-by: Shir Bromberg <[email protected]>
---
.../main/resources/etc/custom.system.properties | 1 +
.../ElasticSearchPersistenceServiceImpl.java | 90 +++++++++++++++-------
.../resources/OSGI-INF/blueprint/blueprint.xml | 1 +
.../org.apache.unomi.persistence.elasticsearch.cfg | 1 +
4 files changed, 64 insertions(+), 29 deletions(-)
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index dd58502..f55d9cf 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -72,6 +72,7 @@
org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_REPLICAS:-0}
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 2219a2b..dee1679 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -194,6 +194,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private String numberOfReplicas;
private String indexMappingTotalFieldsLimit;
private String indexMaxDocValueFieldsSearch;
+ private String[] fatalIllegalStateErrors;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
@@ -247,6 +248,11 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
+ this.fatalIllegalStateErrors =
Arrays.stream(fatalIllegalStateErrors.split(","))
+ .map(i -> i.trim()).filter(i ->
!i.isEmpty()).toArray(String[]::new);
+ }
+
public void setIndexPrefix(String indexPrefix) {
this.indexPrefix = indexPrefix;
}
@@ -379,7 +385,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
public void start() throws Exception {
// on startup
- new InClassLoaderExecute<Object>(null, null) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext,
this.fatalIllegalStateErrors) {
public Object execute(Object... args) throws Exception {
bulkProcessorConcurrentRequests =
System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS,
bulkProcessorConcurrentRequests);
@@ -578,7 +584,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
public void stop() {
- new InClassLoaderExecute<Object>(null, null) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext,
this.fatalIllegalStateErrors) {
protected Object execute(Object... args) throws IOException {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
@@ -708,7 +714,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public <T extends Item> T load(final String itemId, final Date dateHint,
final Class<T> clazz) {
- return new InClassLoaderExecute<T>(metricsService,
this.getClass().getName() + ".loadItem") {
+ return new InClassLoaderExecute<T>(metricsService,
this.getClass().getName() + ".loadItem", this.bundleContext,
this.fatalIllegalStateErrors) {
protected T execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -766,7 +772,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public boolean save(final Item item, final boolean useBatching) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".saveItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".saveItem", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String source =
ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
@@ -812,7 +818,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public boolean update(final String itemId, final Date dateHint, final
Class clazz, final Map source) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateItem", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -838,7 +844,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final
Class<?> clazz, final String[] scripts, final Map<String, Object>[]
scriptParams, final Condition[] conditions) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithQueryAndScript") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -895,7 +901,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public boolean updateWithScript(final String itemId, final Date dateHint,
final Class<?> clazz, final String script, final Map<String, Object>
scriptParams) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithScript") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithScript", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -927,7 +933,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T>
clazz) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeItem", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -948,7 +954,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
}
public <T extends Item> boolean removeByQuery(final Condition query, final
Class<T> clazz) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeByQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeByQuery", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -1013,7 +1019,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
public boolean indexTemplateExists(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".indexTemplateExists") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".indexTemplateExists", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
IndexTemplatesExistRequest indexTemplatesExistRequest = new
IndexTemplatesExistRequest(templateName);
return
client.indices().existsTemplate(indexTemplatesExistRequest,
RequestOptions.DEFAULT);
@@ -1027,7 +1033,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
public boolean removeIndexTemplate(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeIndexTemplate") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeIndexTemplate", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new
DeleteIndexTemplateRequest(templateName);
AcknowledgedResponse deleteIndexTemplateResponse =
client.indices().deleteTemplate(deleteIndexTemplateRequest,
RequestOptions.DEFAULT);
@@ -1042,7 +1048,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
public boolean createMonthlyIndexTemplate() {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".createMonthlyIndexTemplate") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".createMonthlyIndexTemplate", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
boolean executedSuccessfully = true;
for (String itemName : itemsMonthlyIndexed) {
@@ -1083,7 +1089,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
public boolean createIndex(final String itemType) {
String index = getIndex(itemType);
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".createIndex") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".createIndex", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
@@ -1104,7 +1110,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
public boolean removeIndex(final String itemType) {
String index = getIndex(itemType);
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeIndex") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeIndex", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
@@ -1251,7 +1257,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String
itemType) {
- return new InClassLoaderExecute<Map<String, Map<String,
Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") {
+ return new InClassLoaderExecute<Map<String, Map<String,
Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping",
this.bundleContext, this.fatalIllegalStateErrors) {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args)
throws Exception {
// Get all mapping for current itemType
@@ -1351,7 +1357,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
public boolean saveQuery(final String queryName, final String query) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".saveQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".saveQuery", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1386,7 +1392,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public boolean removeQuery(final String queryName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeQuery", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1507,7 +1513,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
private long queryCount(final QueryBuilder filter, final String itemType) {
- return new InClassLoaderExecute<Long>(metricsService,
this.getClass().getName() + ".queryCount") {
+ return new InClassLoaderExecute<Long>(metricsService,
this.getClass().getName() + ".queryCount", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected Long execute(Object... args) throws IOException {
@@ -1523,7 +1529,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
private <T extends Item> PartialList<T> query(final QueryBuilder query,
final String sortBy, final Class<T> clazz, final int offset, final int size,
final String[] routing, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService,
this.getClass().getName() + ".query") {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService,
this.getClass().getName() + ".query", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -1649,7 +1655,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public <T extends Item> PartialList<T> continueScrollQuery(final Class<T>
clazz, final String scrollIdentifier, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService,
this.getClass().getName() + ".continueScrollQuery") {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService,
this.getClass().getName() + ".continueScrollQuery", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -1705,7 +1711,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
private Map<String, Long> aggregateQuery(final Condition filter, final
BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery) {
- return new InClassLoaderExecute<Map<String, Long>>(metricsService,
this.getClass().getName() + ".aggregateQuery") {
+ return new InClassLoaderExecute<Map<String, Long>>(metricsService,
this.getClass().getName() + ".aggregateQuery", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected Map<String, Long> execute(Object... args) throws
IOException {
@@ -1866,7 +1872,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public void refresh() {
- new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".refresh") {
+ new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".refresh", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) {
if (bulkProcessor != null) {
bulkProcessor.flush();
@@ -1883,7 +1889,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint) {
- new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".refreshIndex") {
+ new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".refreshIndex", this.bundleContext,
this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) {
try {
String itemType = Item.getItemType(clazz);
@@ -1900,7 +1906,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public void purge(final Date date) {
- new InClassLoaderExecute<Object>(metricsService,
this.getClass().getName() + ".purgeWithDate") {
+ new InClassLoaderExecute<Object>(metricsService,
this.getClass().getName() + ".purgeWithDate", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected Object execute(Object... args) throws Exception {
@@ -1936,7 +1942,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public void purge(final String scope) {
- new InClassLoaderExecute<Void>(metricsService,
this.getClass().getName() + ".purgeWithScope") {
+ new InClassLoaderExecute<Void>(metricsService,
this.getClass().getName() + ".purgeWithScope", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected Void execute(Object... args) throws IOException {
QueryBuilder query = termQuery("scope", scope);
@@ -1988,7 +1994,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition
condition, final String[] metrics, final String field, final String itemType) {
- return new InClassLoaderExecute<Map<String, Double>>(metricsService,
this.getClass().getName() + ".getSingleValuesMetrics") {
+ return new InClassLoaderExecute<Map<String, Double>>(metricsService,
this.getClass().getName() + ".getSingleValuesMetrics", this.bundleContext,
this.fatalIllegalStateErrors) {
@Override
protected Map<String, Double> execute(Object... args) throws
IOException {
@@ -2057,10 +2063,14 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
private String timerName;
private MetricsService metricsService;
+ private BundleContext bundleContext;
+ private String[] fatalIllegalStateErrors; // Errors that if occur -
stop the application
- public InClassLoaderExecute(MetricsService metricsService, String
timerName) {
+ public InClassLoaderExecute(MetricsService metricsService, String
timerName, BundleContext bundleContext, String[] fatalIllegalStateErrors) {
this.timerName = timerName;
this.metricsService = metricsService;
+ this.bundleContext = bundleContext;
+ this.fatalIllegalStateErrors = fatalIllegalStateErrors;
}
protected abstract T execute(Object... args) throws Exception;
@@ -2084,12 +2094,34 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
try {
return executeInClassLoader(timerName, args);
} catch (Throwable t) {
- if (logError) {
- logger.error("Error while executing in class loader", t);
+ Throwable tTemp = t;
+ // Go over the stack trace and check if there were any fatal
state errors
+ while (tTemp != null) {
+ if (tTemp instanceof IllegalStateException &&
Arrays.stream(this.fatalIllegalStateErrors).anyMatch(tTemp.getMessage()::contains))
{
+ handleFatalStateError(); // Stop application
+ return null;
+ }
+ tTemp = tTemp.getCause();
}
+ handleError(t, logError);
}
return null;
}
+
+ private void handleError(Throwable t, boolean logError) {
+ if (logError) {
+ logger.error("Error while executing in class loader", t);
+ }
+ }
+
+ private void handleFatalStateError() {
+ logger.error("Fatal state error occurred - stopping application");
+ try {
+ this.bundleContext.getBundle(0).stop();
+ } catch (Throwable tInner) { // Stopping system bundle failed -
force exit
+ System.exit(-1);
+ }
+ }
}
private <T extends Item> boolean isCacheActiveForClass(String className) {
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 04bd53b..5888c26 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -104,6 +104,7 @@
<property name="indexMappingTotalFieldsLimit"
value="${es.indexMappingTotalFieldsLimit}"/>
<property name="indexMaxDocValueFieldsSearch"
value="${es.indexMaxDocValueFieldsSearch}"/>
<property name="elasticSearchAddresses"
value="${es.elasticSearchAddresses}"/>
+ <property name="fatalIllegalStateErrors"
value="${es.fatalIllegalStateErrors}"/>
<property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/>
<property name="itemsMonthlyIndexedOverride"
value="${es.monthlyIndex.itemsMonthlyIndexedOverride}" />
<property name="routingByType">
diff --git
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ffecb4a..c6205ed 100644
---
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -20,6 +20,7 @@
cluster.name=${org.apache.unomi.elasticsearch.cluster.name:-contextElasticSearch
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
elasticSearchAddresses=${org.apache.unomi.elasticsearch.addresses:-localhost:9200}
+fatalIllegalStateErrors=${org.apache.unomi.elasticsearch.fatalIllegalStateErrors:-}
index.prefix=${org.apache.unomi.elasticsearch.index.prefix:-context}
monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShards:-5}
monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}