This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch UNOMI-225-ES7
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/UNOMI-225-ES7 by this push:
new b5eee5a ES7 support code cleanup mostly Persistence Service Impl,
removing unnecessary stuff.
b5eee5a is described below
commit b5eee5a9b12dcfa7c01f9655e9e32fea4e60987d
Author: Kevan <[email protected]>
AuthorDate: Fri Nov 22 12:43:47 2019 +0100
ES7 support code cleanup mostly Persistence Service Impl, removing
unnecessary stuff.
---
.../ElasticSearchPersistenceServiceImpl.java | 70 +++++++++++-----------
.../unomi/persistence/spi/PersistenceService.java | 12 ++--
2 files changed, 42 insertions(+), 40 deletions(-)
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 23bb970..478b44f 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -52,7 +52,6 @@ import org.elasticsearch.client.*;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
@@ -132,7 +131,6 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher
conditionESQueryBuilderDispatcher;
- private Map<String, String> indexNames = new HashMap<>();
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
@@ -145,7 +143,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private String bulkProcessorBackoffPolicy = "exponential";
private String minimalElasticSearchVersion = "7.0.0";
- private String maximalElasticSearchVersion = "7.4.0";
+ private String maximalElasticSearchVersion = "8.0.0";
private int aggregateQueryBucketSize = 5000;
@@ -202,10 +200,6 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
this.itemsMonthlyIndexed = itemsMonthlyIndexed;
}
- public void setIndexNames(Map<String, String> indexNames) {
- this.indexNames = indexNames;
- }
-
public void setRoutingByType(Map<String, String> routingByType) {
this.routingByType = routingByType;
}
@@ -299,9 +293,6 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
logger.info("Overriding cluster name from system
property=" + clusterName);
}
- Settings.Builder transportSettings = Settings.builder()
- .put(CLUSTER_NAME, clusterName);
-
List<Node> nodeList = new ArrayList<>();
for (String elasticSearchAddress : elasticSearchAddressList) {
String[] elasticSearchAddressParts =
elasticSearchAddress.split(":");
@@ -310,7 +301,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
nodeList.add(new Node(new HttpHost(elasticSearchHostName,
elasticSearchPort, "http")));
}
- logger.info("Connecting to ElasticSearch persistence backend
using cluster name " + clusterName + " and index name " + indexPrefix + "...");
+ logger.info("Connecting to ElasticSearch persistence backend
using cluster name " + clusterName + " and index prefix " + indexPrefix +
"...");
client = new RestHighLevelClient(
RestClient.builder(nodeList.toArray(new
Node[nodeList.size()])));
@@ -809,7 +800,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
BulkRequest deleteByScopeBulkRequest = new BulkRequest();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
- SearchRequest searchRequest = new
SearchRequest(indexPrefix + "*")
+ SearchRequest searchRequest = new
SearchRequest(getAllIndexForQuery())
.indices(getIndexNameForQuery(itemType))
.scroll(keepAlive);
SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder()
@@ -900,7 +891,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
boolean executedSuccessfully = true;
for (String itemName : itemsMonthlyIndexed) {
PutIndexTemplateRequest putIndexTemplateRequest = new
PutIndexTemplateRequest("context-"+itemName+"-date-template")
- .patterns(Arrays.asList(indexPrefix +
"-"+itemName.toLowerCase()+"-" + INDEX_DATE_PREFIX + "*"))
+
.patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName)))
.settings("{\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " +
monthlyIndexNumberOfShards + ",\n" +
@@ -930,17 +921,20 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
- public boolean createIndex(final String indexName) {
+ public boolean createIndex(final String itemType) {
+ String index = getIndex(itemType);
+
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".createIndex") {
protected Boolean execute(Object... args) throws IOException {
- GetIndexRequest getIndexRequest = new
GetIndexRequest((indexPrefix + "-" + indexName).toLowerCase());
+ GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
if (!indexExists) {
- internalCreateIndex((indexPrefix + "-" +
indexName).toLowerCase(), mappings.get(indexName));
+ internalCreateIndex(index, mappings.get(itemType));
}
return !indexExists;
}
}.catchingExecuteInClassLoader(true);
+
if (result == null) {
return false;
} else {
@@ -948,18 +942,21 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
- public boolean removeIndex(final String indexName) {
+ public boolean removeIndex(final String itemType) {
+ String index = getIndex(itemType);
+
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".removeIndex") {
protected Boolean execute(Object... args) throws IOException {
- GetIndexRequest getIndexRequest = new
GetIndexRequest(indexPrefix + "-" + indexName);
+ GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
if (indexExists) {
- DeleteIndexRequest deleteIndexRequest = new
DeleteIndexRequest(indexPrefix + "-" + indexName);
+ DeleteIndexRequest deleteIndexRequest = new
DeleteIndexRequest(index);
client.indices().delete(deleteIndexRequest,
RequestOptions.DEFAULT);
}
return indexExists;
}
}.catchingExecuteInClassLoader(true);
+
if (result == null) {
return false;
} else {
@@ -1001,13 +998,8 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
if (client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT)) {
putMapping(source, indexName);
}
- } else if (indexNames.containsKey(type)) {
- GetIndexRequest getIndexRequest = new
GetIndexRequest(indexNames.get(type));
- if (client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT)) {
- putMapping(source, indexNames.get(type));
- }
} else {
- putMapping(source, getIndex(type, new Date()));
+ putMapping(source, getIndex(type));
}
} catch (IOException ioe) {
logger.error("Error while creating mapping for type " + type + "
and source " + source, ioe);
@@ -1020,7 +1012,6 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
client.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
}
-
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String
itemType) {
return new InClassLoaderExecute<Map<String, Map<String,
Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") {
@@ -1640,7 +1631,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
@Override
protected Object execute(Object... args) throws Exception {
- GetIndexRequest getIndexRequest = new
GetIndexRequest(indexPrefix + "*");
+ GetIndexRequest getIndexRequest = new
GetIndexRequest(getAllIndexForQuery());
GetIndexResponse getIndexResponse =
client.indices().get(getIndexRequest, RequestOptions.DEFAULT);
String[] indices = getIndexResponse.getIndices();
@@ -1680,7 +1671,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
BulkRequest deleteByScopeBulkRequest = new BulkRequest();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
- SearchRequest searchRequest = new SearchRequest(indexPrefix +
"*").scroll(keepAlive);
+ SearchRequest searchRequest = new
SearchRequest(getAllIndexForQuery()).scroll(keepAlive);
SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder()
.query(query)
.size(100);
@@ -1780,10 +1771,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}.catchingExecuteInClassLoader(true);
}
- private String getIndexNameForQuery(String itemType) {
- return indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) ? indexPrefix + "-" +
itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + "*" : getIndex(itemType,
null));
- }
+
private String getConfig(Map<String, String> settings, String key,
String defaultValue) {
@@ -1869,10 +1857,24 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
return itemCache.remove(itemId);
}
+ private String getAllIndexForQuery() {
+ return indexPrefix + "*";
+ }
+
+ private String getIndexNameForQuery(String itemType) {
+ return itemsMonthlyIndexed.contains(itemType) ?
getMonthlyIndexForQuery(itemType) : getIndex(itemType, null);
+ }
+
+ private String getMonthlyIndexForQuery(String itemType) {
+ return indexPrefix + "-" + itemType.toLowerCase() + "-" +
INDEX_DATE_PREFIX + "*";
+ }
+
private String getIndex(String itemType, Date dateHint) {
- String indexItemTypePart = indexNames.containsKey(itemType) ?
indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) && dateHint != null ?
itemType + "-" + getMonthlyIndexPart(dateHint) : itemType);
+ String indexItemTypePart = itemsMonthlyIndexed.contains(itemType) &&
dateHint != null ? itemType + "-" + getMonthlyIndexPart(dateHint) : itemType;
+ return getIndex(indexItemTypePart);
+ }
+ private String getIndex(String indexItemTypePart) {
return (indexPrefix + "-" + indexItemTypePart).toLowerCase();
}
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index ede120d..0151b9c 100644
---
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -480,24 +480,24 @@ public interface PersistenceService {
Map<String, Double> getSingleValuesMetrics(Condition condition, String[]
metrics, String field, String type);
/**
- * Creates an index with the specified name in the persistence engine.
+ * Creates an index with for the specified item type in the persistence
engine.
*
* TODO: remove from API?
*
- * @param indexName the index name
+ * @param itemType the item type
* @return {@code true} if the operation was successful, {@code false}
otherwise
*/
- boolean createIndex(final String indexName);
+ boolean createIndex(final String itemType);
/**
- * Removes the index with the specified name.
+ * Removes the index for the specified item type.
*
* TODO: remove from API?
*
- * @param indexName the index name
+ * @param itemType the item type
* @return {@code true} if the operation was successful, {@code false}
otherwise
*/
- boolean removeIndex(final String indexName);
+ boolean removeIndex(final String itemType);
/**
* Removes all data associated with the provided scope.