This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch unomi-1.5.x in repository https://gitbox.apache.org/repos/asf/unomi.git
commit d9e4a717f955a8e9640e3b9e2e9696a491079295 Author: liatiusim <[email protected]> AuthorDate: Tue Oct 13 17:56:44 2020 +0200 Unomi - 387: Stop Unomi in case of an error which its cannot recovery from (#186) * Stop Unomi if reactor stopped * Use system bundle stop instead of system exit, define fatal errors in configuration Co-authored-by: Shir Bromberg <[email protected]> (cherry picked from commit a6999de528eafccd43076c622fca74d3807877db) --- .../main/resources/etc/custom.system.properties | 1 + .../ElasticSearchPersistenceServiceImpl.java | 90 +++++++++++++++------- .../resources/OSGI-INF/blueprint/blueprint.xml | 1 + .../org.apache.unomi.persistence.elasticsearch.cfg | 1 + 4 files changed, 64 insertions(+), 29 deletions(-) diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index dd58502..f55d9cf 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -72,6 +72,7 @@ org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM # hostA:9200,hostB:9200 # Note: the port number must be repeated for each host. org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200} +org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-} org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context} org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5} org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_REPLICAS:-0} diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index b86b368..530395b 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -148,6 +148,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String numberOfReplicas; private String indexMappingTotalFieldsLimit; private String indexMaxDocValueFieldsSearch; + private String[] fatalIllegalStateErrors; private BundleContext bundleContext; private Map<String, String> mappings = new HashMap<String, String>(); private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher; @@ -201,6 +202,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) { + this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(",")) + .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new); + } + public void setIndexPrefix(String indexPrefix) { this.indexPrefix = indexPrefix; } @@ -333,7 +339,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void start() throws Exception { // on startup - new InClassLoaderExecute<Object>(null, null) { + new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors) { public Object execute(Object... args) throws Exception { bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests); @@ -532,7 +538,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void stop() { - new InClassLoaderExecute<Object>(null, null) { + new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors) { protected Object execute(Object... args) throws IOException { logger.info("Closing ElasticSearch persistence backend..."); if (bulkProcessor != null) { @@ -662,7 +668,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) { - return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem") { + return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem", this.bundleContext, this.fatalIllegalStateErrors) { protected T execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -720,7 +726,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean save(final Item item, final boolean useBatching) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item); @@ -766,7 +772,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -792,7 +798,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -849,7 +855,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -881,7 +887,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -902,7 +908,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); @@ -967,7 +973,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean indexTemplateExists(final String templateName) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws IOException { IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(templateName); return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT); @@ -981,7 +987,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public boolean removeIndexTemplate(final String templateName) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws IOException { DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(templateName); AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT); @@ -996,7 +1002,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public boolean createMonthlyIndexTemplate() { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws IOException { boolean executedSuccessfully = true; for (String itemName : itemsMonthlyIndexed) { @@ -1037,7 +1043,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean createIndex(final String itemType) { String index = getIndex(itemType); - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws IOException { GetIndexRequest getIndexRequest = new GetIndexRequest(index); boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); @@ -1058,7 +1064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndex(final String itemType) { String index = getIndex(itemType); - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws IOException { GetIndexRequest getIndexRequest = new GetIndexRequest(index); boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); @@ -1129,7 +1135,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) { - return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") { + return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping", this.bundleContext, this.fatalIllegalStateErrors) { @SuppressWarnings("unchecked") protected Map<String, Map<String, Object>> execute(Object... args) throws Exception { // Get all mapping for current itemType @@ -1226,7 +1232,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public boolean saveQuery(final String queryName, final String query) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { @@ -1261,7 +1267,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean removeQuery(final String queryName) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery") { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { @@ -1382,7 +1388,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private long queryCount(final QueryBuilder filter, final String itemType) { - return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount") { + return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected Long execute(Object... args) throws IOException { @@ -1398,7 +1404,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) { - return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query") { + return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected PartialList<T> execute(Object... args) throws Exception { @@ -1524,7 +1530,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) { - return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery") { + return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected PartialList<T> execute(Object... args) throws Exception { @@ -1580,7 +1586,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType, final boolean optimizedQuery) { - return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery") { + return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected Map<String, Long> execute(Object... args) throws IOException { @@ -1741,7 +1747,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public void refresh() { - new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh") { + new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) { if (bulkProcessor != null) { bulkProcessor.flush(); @@ -1758,7 +1764,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint){ - new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex") { + new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) { try { String itemType = Item.getItemType(clazz); @@ -1776,7 +1782,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public void purge(final Date date) { - new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") { + new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected Object execute(Object... args) throws Exception { @@ -1812,7 +1818,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public void purge(final String scope) { - new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope") { + new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected Void execute(Object... args) throws IOException { QueryBuilder query = termQuery("scope", scope); @@ -1864,7 +1870,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) { - return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics") { + return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics", this.bundleContext, this.fatalIllegalStateErrors) { @Override protected Map<String, Double> execute(Object... args) throws IOException { @@ -1934,10 +1940,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String timerName; private MetricsService metricsService; + private BundleContext bundleContext; + private String[] fatalIllegalStateErrors; // Errors that if occur - stop the application - public InClassLoaderExecute(MetricsService metricsService, String timerName) { + public InClassLoaderExecute(MetricsService metricsService, String timerName, BundleContext bundleContext, String[] fatalIllegalStateErrors) { this.timerName = timerName; this.metricsService = metricsService; + this.bundleContext = bundleContext; + this.fatalIllegalStateErrors = fatalIllegalStateErrors; } protected abstract T execute(Object... args) throws Exception; @@ -1961,12 +1971,34 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { return executeInClassLoader(timerName, args); } catch (Throwable t) { - if (logError) { - logger.error("Error while executing in class loader", t); + Throwable tTemp = t; + // Go over the stack trace and check if there were any fatal state errors + while (tTemp != null) { + if (tTemp instanceof IllegalStateException && Arrays.stream(this.fatalIllegalStateErrors).anyMatch(tTemp.getMessage()::contains)) { + handleFatalStateError(); // Stop application + return null; + } + tTemp = tTemp.getCause(); } + handleError(t, logError); } return null; } + + private void handleError(Throwable t, boolean logError) { + if (logError) { + logger.error("Error while executing in class loader", t); + } + } + + private void handleFatalStateError() { + logger.error("Fatal state error occurred - stopping application"); + try { + this.bundleContext.getBundle(0).stop(); + } catch (Throwable tInner) { // Stopping system bundle failed - force exit + System.exit(-1); + } + } } private <T extends Item> boolean isCacheActiveForClass(String className) { diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 04bd53b..5888c26 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -104,6 +104,7 @@ <property name="indexMappingTotalFieldsLimit" value="${es.indexMappingTotalFieldsLimit}"/> <property name="indexMaxDocValueFieldsSearch" value="${es.indexMaxDocValueFieldsSearch}"/> <property name="elasticSearchAddresses" value="${es.elasticSearchAddresses}"/> + <property name="fatalIllegalStateErrors" value="${es.fatalIllegalStateErrors}"/> <property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/> <property name="itemsMonthlyIndexedOverride" value="${es.monthlyIndex.itemsMonthlyIndexedOverride}" /> <property name="routingByType"> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index ffecb4a..c6205ed 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -20,6 +20,7 @@ cluster.name=${org.apache.unomi.elasticsearch.cluster.name:-contextElasticSearch # hostA:9200,hostB:9200 # Note: the port number must be repeated for each host. elasticSearchAddresses=${org.apache.unomi.elasticsearch.addresses:-localhost:9200} +fatalIllegalStateErrors=${org.apache.unomi.elasticsearch.fatalIllegalStateErrors:-} index.prefix=${org.apache.unomi.elasticsearch.index.prefix:-context} monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShards:-5} monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}
