This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 0847d6c ATLAS-4554: updated monitoring to support elasticsearch index backend 0847d6c is described below commit 0847d6c371a4482924ff642b525c633d7e0f5e6a Author: xingyu_zha <xingyu_...@intsig.net> AuthorDate: Mon Feb 14 17:56:13 2022 +0800 ATLAS-4554: updated monitoring to support elasticsearch index backend Signed-off-by: Madhan Neethiraj <mad...@apache.org> --- .../graphdb/janus/AtlasJanusGraphDatabase.java | 24 + .../graphdb/janus/AtlasJanusGraphIndexClient.java | 27 +- .../diskstorage/es/ElasticSearch7Index.java | 1299 ++++++++++++++++++++ .../org/apache/atlas/ApplicationProperties.java | 7 + .../repository/graph/IndexRecoveryService.java | 8 +- 5 files changed, 1355 insertions(+), 10 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index 3995cf9..cb3e8d9 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -37,6 +37,7 @@ import org.janusgraph.core.JanusGraphFactory; import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.diskstorage.StandardIndexProvider; import org.janusgraph.diskstorage.StandardStoreManager; +import org.janusgraph.diskstorage.es.ElasticSearch7Index; import org.janusgraph.diskstorage.solr.Solr6Index; import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer; import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry; @@ -122,6 +123,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, addHBase2Support(); addSolr6Index(); + + addElasticSearch7Index(); } private static void addHBase2Support() { @@ -164,6 +167,27 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, } } + private static void addElasticSearch7Index() { + try { + Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES"); + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses()); + customMap.put("elasticsearch", ElasticSearch7Index.class.getName()); + ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap); + field.set(null, immap); + + LOG.debug("Injected es7 index - {}", ElasticSearch7Index.class.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static JanusGraph getGraphInstance() { if (graphInstance == null) { synchronized (AtlasJanusGraphDatabase.class) { diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java index 9e9fdd8..f9f3772 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.graphdb.janus; import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasAggregationEntry; import org.apache.atlas.repository.Constants; @@ -42,6 +43,8 @@ import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.TermsResponse; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.util.NamedList; +import org.janusgraph.diskstorage.es.ElasticSearch7Index; +import org.janusgraph.diskstorage.es.ElasticSearchClient; import org.janusgraph.diskstorage.solr.Solr6Index; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +73,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { private static final String TERMS_FIELD = "terms.fl"; private static final int SOLR_HEALTHY_STATUS = 0; private static final long SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min - private static long prevSolrHealthCheckTime; + private static long prevIdxHealthCheckTime; private final Configuration configuration; @@ -83,21 +86,26 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { public boolean isHealthy() { boolean isHealthy = false; long currentTime = System.currentTimeMillis(); + String idxBackEnd = configuration.getString(ApplicationProperties.INDEX_BACKEND_CONF); try { - if (isSolrHealthy()) { - isHealthy = true; + if (ApplicationProperties.INDEX_BACKEND_SOLR.equals(idxBackEnd)) { + isHealthy = isSolrHealthy(); + } else if (ApplicationProperties.INDEX_BACKEND_ELASTICSEARCH.equals(idxBackEnd)) { + isHealthy = isElasticsearchHealthy(); } + + LOG.info("indexBackEnd={}; isHealthy={}", idxBackEnd, isHealthy); } catch (Exception exception) { if (LOG.isDebugEnabled()) { LOG.error("Error: isHealthy", exception); } } - if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) { - LOG.info("Solr Health: Unhealthy!"); + if (!isHealthy && (prevIdxHealthCheckTime == 0 || currentTime - prevIdxHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) { + LOG.info("Backend Health: Unhealthy!"); - prevSolrHealthCheckTime = currentTime; + prevIdxHealthCheckTime = currentTime; } return isHealthy; @@ -379,6 +387,13 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS; } + private boolean isElasticsearchHealthy() throws IOException { + ElasticSearchClient client = ElasticSearch7Index.getElasticSearchClient(); + String janusVertexIndex = ApplicationProperties.DEFAULT_INDEX_NAME + "_" + Constants.VERTEX_INDEX; + + return client != null && client.indexExists(janusVertexIndex); + } + private void graphManagementCommit(AtlasGraphManagement management) { try { management.commit(); diff --git a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java new file mode 100644 index 0000000..51a87f5 --- /dev/null +++ b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java @@ -0,0 +1,1299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.es; + +import static org.janusgraph.diskstorage.es.ElasticSearchIndex.*; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import org.janusgraph.core.Cardinality; +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.core.attribute.Cmp; +import org.janusgraph.core.attribute.Geo; +import org.janusgraph.core.attribute.Geoshape; +import org.janusgraph.core.attribute.Text; +import org.janusgraph.core.schema.Mapping; +import org.janusgraph.core.schema.Parameter; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.BaseTransaction; +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.BaseTransactionConfigurable; +import org.janusgraph.diskstorage.PermanentBackendException; +import org.janusgraph.diskstorage.TemporaryBackendException; +import org.janusgraph.diskstorage.configuration.ConfigOption; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.es.compat.AbstractESCompat; +import org.janusgraph.diskstorage.es.compat.ESCompatUtils; +import org.janusgraph.diskstorage.es.mapping.IndexMapping; +import org.janusgraph.diskstorage.es.script.ESScriptResponse; +import org.janusgraph.diskstorage.indexing.IndexEntry; +import org.janusgraph.diskstorage.indexing.IndexFeatures; +import org.janusgraph.diskstorage.indexing.IndexMutation; +import org.janusgraph.diskstorage.indexing.IndexProvider; +import org.janusgraph.diskstorage.indexing.IndexQuery; +import org.janusgraph.diskstorage.indexing.KeyInformation; +import org.janusgraph.diskstorage.indexing.RawQuery; +import org.janusgraph.diskstorage.util.DefaultTransaction; +import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; +import org.janusgraph.graphdb.database.serialize.AttributeUtils; +import org.janusgraph.graphdb.query.JanusGraphPredicate; +import org.janusgraph.graphdb.query.condition.And; +import org.janusgraph.graphdb.query.condition.Condition; +import org.janusgraph.graphdb.query.condition.Not; +import org.janusgraph.graphdb.query.condition.Or; +import org.janusgraph.graphdb.query.condition.PredicateCondition; +import org.janusgraph.graphdb.types.ParameterType; +import org.locationtech.spatial4j.shape.Rectangle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_DOC_KEY; +import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_GEO_COORDS_KEY; +import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_LANG_KEY; +import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_SCRIPT_KEY; +import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_TYPE_KEY; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME; + +/** + * Do not change + * This is a copy of ElasticSearchIndex.java from org.janusgraph.diskstorage.es + * Added a new method to new client instance + */ + +@PreInitializeConfigOptions +public class ElasticSearch7Index implements IndexProvider { + + private static final Logger log = LoggerFactory.getLogger(ElasticSearchIndex.class); + + // add elasticsearch index instance(Singleton Pattern) + private static ElasticSearch7Index instance = null; + + private static final String STRING_MAPPING_SUFFIX = "__STRING"; + // add if need new instance + public static final ConfigOption<Boolean> CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST = new ConfigOption(ELASTICSEARCH_NS, "create-client-per-request", "when false, allows the sharing of es client across other components.", org.janusgraph.diskstorage.configuration.ConfigOption.Type.LOCAL, false); + + private static final String PARAMETERIZED_DELETION_SCRIPT = parameterizedScriptPrepare("", + "for (field in params.fields) {", + " if (field.cardinality == 'SINGLE') {", + " ctx._source.remove(field.name);", + " } else if (ctx._source.containsKey(field.name)) {", + " def fieldIndex = ctx._source[field.name].indexOf(field.value);", + " if (fieldIndex >= 0 && fieldIndex < ctx._source[field.name].size()) {", + " ctx._source[field.name].remove(fieldIndex);", + " }", + " }", + "}"); + + private static final String PARAMETERIZED_ADDITION_SCRIPT = parameterizedScriptPrepare("", + "for (field in params.fields) {", + " if (ctx._source[field.name] == null) {", + " ctx._source[field.name] = [];", + " }", + " if (field.cardinality != 'SET' || ctx._source[field.name].indexOf(field.value) == -1) {", + " ctx._source[field.name].add(field.value);", + " }", + "}"); + + static final String INDEX_NAME_SEPARATOR = "_"; + private static final String SCRIPT_ID_SEPARATOR = "-"; + + private static final String MAX_OPEN_SCROLL_CONTEXT_PARAMETER = "search.max_open_scroll_context"; + private static final Map<String, Object> MAX_RESULT_WINDOW = ImmutableMap.of("index.max_result_window", Integer.MAX_VALUE); + + private static final Parameter[] NULL_PARAMETERS = null; + + private static final String TRACK_TOTAL_HITS_PARAMETER = "track_total_hits"; + private static final Parameter[] TRACK_TOTAL_HITS_DISABLED_PARAMETERS = new Parameter[]{new Parameter<>(TRACK_TOTAL_HITS_PARAMETER, false)}; + private static final Map<String, Object> TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY = ImmutableMap.of(TRACK_TOTAL_HITS_PARAMETER, false); + + private final Function<String, String> generateIndexStoreNameFunction = this::generateIndexStoreName; + private final Map<String, String> indexStoreNamesCache = new ConcurrentHashMap<>(); + private final boolean indexStoreNameCacheEnabled; + + private final AbstractESCompat compat; + private final ElasticSearchClient client; + private final Configuration configuration; + private final String indexName; + private final int batchSize; + private final boolean useExternalMappings; + private final boolean allowMappingUpdate; + private final Map<String, Object> indexSetting; + private final long createSleep; + private final boolean useAllField; + private final Map<String, Object> ingestPipelines; + private final boolean useMappingForES7; + private final String parameterizedAdditionScriptId; + private final String parameterizedDeletionScriptId; + + private static boolean createElasticSearchClientPerRequest; + + public ElasticSearch7Index(Configuration config) throws BackendException { + // fetch configuration + this.configuration = config; + indexName = config.get(INDEX_NAME); + parameterizedAdditionScriptId = generateScriptId("add"); + parameterizedDeletionScriptId = generateScriptId("del"); + useAllField = config.get(USE_ALL_FIELD); + useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS); + allowMappingUpdate = config.get(ALLOW_MAPPING_UPDATE); + createSleep = config.get(CREATE_SLEEP); + ingestPipelines = config.getSubset(ES_INGEST_PIPELINES); + useMappingForES7 = config.get(USE_MAPPING_FOR_ES7); + indexStoreNameCacheEnabled = config.get(ENABLE_INDEX_STORE_NAMES_CACHE); + batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE); + log.debug("Configured ES query nb result by query to {}", batchSize); + + client = createElasticSearchClient(); + createElasticSearchClientPerRequest = config.get(CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST); + + checkClusterHealth(config.get(HEALTH_REQUEST_TIMEOUT)); + + compat = ESCompatUtils.acquireCompatForVersion(client.getMajorVersion()); + + indexSetting = ElasticSearchSetup.getSettingsFromJanusGraphConf(config); + + setupMaxOpenScrollContextsIfNeeded(config); + + setupStoredScripts(); + + //set instance + ElasticSearch7Index.instance = this; + } + + + //get client + public static ElasticSearchClient getElasticSearchClient() { + ElasticSearchClient ret = null; + ElasticSearch7Index esIndex = ElasticSearch7Index.instance; + + if (esIndex != null) { + if (createElasticSearchClientPerRequest) { + log.debug("Creating a new ElasticSearch Client."); + + ret = esIndex.createElasticSearchClient(); + } else { + log.debug("Returning the elasticSearch client owned by ElasticSearchIndex."); + + ret = esIndex.client; + } + } else { + log.debug("No ElasticSearchIndex available. Will return null"); + } + + return ret; + } + + //release client + public static void releaseElasticSearchClient(ElasticSearchClient elasticSearchClient) { + if(createElasticSearchClientPerRequest) { + if (elasticSearchClient != null) { + try { + elasticSearchClient.close(); + + if(log.isDebugEnabled()) { + log.debug("Closed the elasticSearch client successfully."); + } + } catch (IOException excp) { + log.warn("Failed to close elasticSearchClient.", excp); + } + } + } else { + if(log.isDebugEnabled()) { + log.debug("Ignoring the closing of elasticSearch client as it is owned by ElasticSearchIndex."); + } + } + } + + //create client + private ElasticSearchClient createElasticSearchClient() { + return interfaceConfiguration(configuration).getClient(); + } + + + + private void checkClusterHealth(String healthCheck) throws BackendException { + try { + client.clusterHealthRequest(healthCheck); + } catch (final IOException e) { + throw new PermanentBackendException(e.getMessage(), e); + } + } + + private void setupStoredScripts() throws PermanentBackendException { + setupStoredScriptIfNeeded(parameterizedAdditionScriptId, PARAMETERIZED_ADDITION_SCRIPT); + setupStoredScriptIfNeeded(parameterizedDeletionScriptId, PARAMETERIZED_DELETION_SCRIPT); + } + + private void setupStoredScriptIfNeeded(String storedScriptId, String source) throws PermanentBackendException { + + ImmutableMap<String, Object> preparedScript = compat.prepareScript(source).build(); + + String lang = (String) ((ImmutableMap<String, Object>) preparedScript.get(ES_SCRIPT_KEY)).get(ES_LANG_KEY); + + try { + ESScriptResponse esScriptResponse = client.getStoredScript(storedScriptId); + + if(Boolean.FALSE.equals(esScriptResponse.getFound()) || !Objects.equals(lang, esScriptResponse.getScript().getLang()) || + !Objects.equals(source, esScriptResponse.getScript().getSource())){ + client.createStoredScript(storedScriptId, preparedScript); + } + + } catch (final IOException e) { + throw new PermanentBackendException(e.getMessage(), e); + } + } + + private void setupMaxOpenScrollContextsIfNeeded(Configuration config) throws PermanentBackendException { + + if(client.getMajorVersion().getValue() > 6){ + + boolean setupMaxOpenScrollContexts; + + if(config.has(SETUP_MAX_OPEN_SCROLL_CONTEXTS)){ + setupMaxOpenScrollContexts = config.get(SETUP_MAX_OPEN_SCROLL_CONTEXTS); + } else { + setupMaxOpenScrollContexts = SETUP_MAX_OPEN_SCROLL_CONTEXTS.getDefaultValue(); + } + + if(setupMaxOpenScrollContexts){ + + Map<String, Object> settings = ImmutableMap.of("persistent", + ImmutableMap.of(MAX_OPEN_SCROLL_CONTEXT_PARAMETER, Integer.MAX_VALUE)); + + try { + client.updateClusterSettings(settings); + } catch (final IOException e) { + throw new PermanentBackendException(e.getMessage(), e); + } + } + } + } + + /** + * If ES already contains this instance's target index, then do nothing. + * Otherwise, create the index, then wait . + * <p> + * The {@code client} field must point to a live, connected client. + * The {@code indexName} field must be non-null and point to the name + * of the index to check for existence or create. + * + * @param index index name + * @throws IOException if the index status could not be checked or index could not be created + */ + private void checkForOrCreateIndex(String index) throws IOException { + Objects.requireNonNull(client); + Objects.requireNonNull(index); + + // Create index if it does not useExternalMappings and if it does not already exist + if (!useExternalMappings && !client.indexExists(index)) { + client.createIndex(index, indexSetting); + client.updateIndexSettings(index, MAX_RESULT_WINDOW); + try { + log.debug("Sleeping {} ms after {} index creation returned from actionGet()", createSleep, index); + Thread.sleep(createSleep); + } catch (final InterruptedException e) { + throw new JanusGraphException("Interrupted while waiting for index to settle in", e); + } + } + Preconditions.checkState(client.indexExists(index), "Could not create index: %s",index); + client.addAlias(indexName, index); + } + + + /** + * Configure ElasticSearchIndex's ES client. See{@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more + * information. + * + * @param config a config passed to ElasticSearchIndex's constructor + * @return a client object open and ready for use + */ + private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) { + final ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class); + + try { + return clientMode.connect(config); + } catch (final IOException e) { + throw new JanusGraphException(e); + } + } + + private BackendException convert(Exception esException) { + if (esException instanceof InterruptedException) { + return new TemporaryBackendException("Interrupted while waiting for response", esException); + } else { + return new PermanentBackendException("Unknown exception while executing index operation", esException); + } + } + + private static String getDualMappingName(String key) { + return key + STRING_MAPPING_SUFFIX; + } + + private String generateScriptId(String uniqueScriptSuffix){ + return indexName + SCRIPT_ID_SEPARATOR + uniqueScriptSuffix; + } + + private String generateIndexStoreName(String store){ + return indexName + INDEX_NAME_SEPARATOR + store.toLowerCase(); + } + + private String getIndexStoreName(String store) { + + if(indexStoreNameCacheEnabled){ + return indexStoreNamesCache.computeIfAbsent(store, generateIndexStoreNameFunction); + } + + return generateIndexStoreName(store); + } + + @Override + public void register(String store, String key, KeyInformation information, + BaseTransaction tx) throws BackendException { + final Class<?> dataType = information.getDataType(); + final Mapping map = Mapping.getMapping(information); + Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtils.isString(dataType) || + (map==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType)), + "Specified illegal mapping [%s] for data type [%s]",map,dataType); + final String indexStoreName = getIndexStoreName(store); + if (useExternalMappings) { + try { + //We check if the externalMapping have the property 'key' + final IndexMapping mappings = client.getMapping(indexStoreName, store); + if (mappings == null || (!mappings.isDynamic() && !mappings.getProperties().containsKey(key))) { + //Error if it is not dynamic and have not the property 'key' + throw new PermanentBackendException("The external mapping for index '"+ indexStoreName + "' and type '" + store + "' do not have property '" + key + "'"); + } else if (allowMappingUpdate && mappings.isDynamic()) { + //If it is dynamic, we push the unknown property 'key' + this.pushMapping(store, key, information); + } + } catch (final IOException e) { + throw new PermanentBackendException(e); + } + } else { + try { + checkForOrCreateIndex(indexStoreName); + } catch (final IOException e) { + throw new PermanentBackendException(e); + } + this.pushMapping(store, key, information); + } + } + + /** + * Push mapping to ElasticSearch + * @param store the type in the index + * @param key the name of the property in the index + * @param information information of the key + */ + private void pushMapping(String store, String key, + KeyInformation information) throws AssertionError, BackendException { + final Class<?> dataType = information.getDataType(); + Mapping map = Mapping.getMapping(information); + final Map<String,Object> properties = new HashMap<>(); + if (AttributeUtils.isString(dataType)) { + if (map==Mapping.DEFAULT) map=Mapping.TEXT; + log.debug("Registering string type for {} with mapping {}", key, map); + final String stringAnalyzer + = ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null); + final String textAnalyzer = ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null); + // use keyword type for string mappings unless custom string analyzer is provided + final Map<String,Object> stringMapping + = stringAnalyzer == null ? compat.createKeywordMapping() : compat.createTextMapping(stringAnalyzer); + switch (map) { + case STRING: + properties.put(key, stringMapping); + break; + case TEXT: + properties.put(key, compat.createTextMapping(textAnalyzer)); + break; + case TEXTSTRING: + properties.put(key, compat.createTextMapping(textAnalyzer)); + properties.put(getDualMappingName(key), stringMapping); + break; + default: throw new AssertionError("Unexpected mapping: "+map); + } + } else if (dataType == Float.class) { + log.debug("Registering float type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "float")); + } else if (dataType == Double.class) { + log.debug("Registering double type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "double")); + } else if (dataType == Byte.class) { + log.debug("Registering byte type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "byte")); + } else if (dataType == Short.class) { + log.debug("Registering short type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "short")); + } else if (dataType == Integer.class) { + log.debug("Registering integer type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "integer")); + } else if (dataType == Long.class) { + log.debug("Registering long type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "long")); + } else if (dataType == Boolean.class) { + log.debug("Registering boolean type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "boolean")); + } else if (dataType == Geoshape.class) { + switch (map) { + case PREFIX_TREE: + final int maxLevels = ParameterType.INDEX_GEO_MAX_LEVELS.findParameter(information.getParameters(), + DEFAULT_GEO_MAX_LEVELS); + final double distErrorPct + = ParameterType.INDEX_GEO_DIST_ERROR_PCT.findParameter(information.getParameters(), + DEFAULT_GEO_DIST_ERROR_PCT); + log.debug("Registering geo_shape type for {} with tree_levels={} and distance_error_pct={}", key, + maxLevels, distErrorPct); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_shape", + "tree", "quadtree", + "tree_levels", maxLevels, + "distance_error_pct", distErrorPct)); + break; + default: + log.debug("Registering geo_point type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_point")); + } + } else if (dataType == Date.class || dataType == Instant.class) { + log.debug("Registering date type for {}", key); + properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "date")); + } else if (dataType == UUID.class) { + log.debug("Registering uuid type for {}", key); + properties.put(key, compat.createKeywordMapping()); + } + + if (useAllField) { + // add custom all field mapping if it doesn't exist + properties.put(ElasticSearchConstants.CUSTOM_ALL_FIELD, compat.createTextMapping(null)); + + // add copy_to for custom all field mapping + if (properties.containsKey(key) && dataType != Geoshape.class) { + final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key))); + mapping.put("copy_to", ElasticSearchConstants.CUSTOM_ALL_FIELD); + properties.put(key, mapping); + } + } + + final List<Parameter> customParameters = ParameterType.getCustomParameters(information.getParameters()); + + if (properties.containsKey(key) && !customParameters.isEmpty()) { + final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key))); + customParameters.forEach(p -> mapping.put(p.key(), p.value())); + properties.put(key, mapping); + } + + final Map<String,Object> mapping = ImmutableMap.of("properties", properties); + + try { + client.createMapping(getIndexStoreName(store), store, mapping); + } catch (final Exception e) { + throw convert(e); + } + } + + private static Mapping getStringMapping(KeyInformation information) { + assert AttributeUtils.isString(information.getDataType()); + Mapping map = Mapping.getMapping(information); + if (map==Mapping.DEFAULT) map = Mapping.TEXT; + return map; + } + + private static boolean hasDualStringMapping(KeyInformation information) { + return AttributeUtils.isString(information.getDataType()) && getStringMapping(information)==Mapping.TEXTSTRING; + } + + public Map<String, Object> getNewDocument(final List<IndexEntry> additions, + KeyInformation.StoreRetriever information) throws BackendException { + // JSON writes duplicate fields one after another, which forces us + // at this stage to make de-duplication on the IndexEntry list. We don't want to pay the + // price map storage on the Mutation level because none of other backends need that. + + final Multimap<String, IndexEntry> unique = LinkedListMultimap.create(); + for (final IndexEntry e : additions) { + unique.put(e.field, e); + } + + final Map<String, Object> doc = new HashMap<>(); + for (final Map.Entry<String, Collection<IndexEntry>> add : unique.asMap().entrySet()) { + final KeyInformation keyInformation = information.get(add.getKey()); + final Object value; + switch (keyInformation.getCardinality()) { + case SINGLE: + value = convertToEsType(Iterators.getLast(add.getValue().iterator()).value, + Mapping.getMapping(keyInformation)); + break; + case SET: + case LIST: + value = add.getValue().stream() + .map(v -> convertToEsType(v.value, Mapping.getMapping(keyInformation))) + .filter(v -> { + Preconditions.checkArgument(!(v instanceof byte[]), + "Collections not supported for %s", add.getKey()); + return true; + }).toArray(); + break; + default: + value = null; + break; + } + + doc.put(add.getKey(), value); + if (hasDualStringMapping(information.get(add.getKey())) && keyInformation.getDataType() == String.class) { + doc.put(getDualMappingName(add.getKey()), value); + } + + + } + + return doc; + } + + private static Object convertToEsType(Object value, Mapping mapping) { + if (value instanceof Number) { + if (AttributeUtils.isWholeNumber((Number) value)) { + return ((Number) value).longValue(); + } else { //double or float + return ((Number) value).doubleValue(); + } + } else if (AttributeUtils.isString(value)) { + return value; + } else if (value instanceof Geoshape) { + return convertGeoshape((Geoshape) value, mapping); + } else if (value instanceof Date) { + return value; + } else if (value instanceof Instant) { + return Date.from((Instant) value); + } else if (value instanceof Boolean) { + return value; + } else if (value instanceof UUID) { + return value.toString(); + } else throw new IllegalArgumentException("Unsupported type: " + value.getClass() + " (value: " + value + ")"); + } + + @SuppressWarnings("unchecked") + private static Object convertGeoshape(Geoshape geoshape, Mapping mapping) { + if (geoshape.getType() == Geoshape.Type.POINT && Mapping.PREFIX_TREE != mapping) { + final Geoshape.Point p = geoshape.getPoint(); + return new double[]{p.getLongitude(), p.getLatitude()}; + } else if (geoshape.getType() == Geoshape.Type.BOX) { + final Rectangle box = geoshape.getShape().getBoundingBox(); + final Map<String,Object> map = new HashMap<>(); + map.put("type", "envelope"); + map.put("coordinates", new double[][] {{box.getMinX(),box.getMaxY()},{box.getMaxX(),box.getMinY()}}); + return map; + } else if (geoshape.getType() == Geoshape.Type.CIRCLE) { + try { + final Map<String,Object> map = geoshape.toMap(); + map.put("radius", map.get("radius") + ((Map<String, String>) map.remove("properties")).get("radius_units")); + return map; + } catch (final IOException e) { + throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e); + } + } else { + try { + return geoshape.toMap(); + } catch (final IOException e) { + throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e); + } + } + } + + @Override + public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever information, + BaseTransaction tx) throws BackendException { + final List<ElasticSearchMutation> requests = new ArrayList<>(); + try { + for (final Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { + final List<ElasticSearchMutation> requestByStore = new ArrayList<>(); + final String storeName = stores.getKey(); + final String indexStoreName = getIndexStoreName(storeName); + for (final Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { + final String documentId = entry.getKey(); + final IndexMutation mutation = entry.getValue(); + assert mutation.isConsolidated(); + Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); + Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); + Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); + //Deletions first + if (mutation.hasDeletions()) { + if (mutation.isDeleted()) { + log.trace("Deleting entire document {}", documentId); + requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, storeName, + documentId)); + } else { + List<Map<String, Object>> params = getParameters(information.get(storeName), + mutation.getDeletions(), true); + Map doc = compat.prepareStoredScript(parameterizedDeletionScriptId, params).build(); + log.trace("Deletion script {} with params {}", PARAMETERIZED_DELETION_SCRIPT, params); + requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName, + documentId, doc)); + } + } + if (mutation.hasAdditions()) { + if (mutation.isNew()) { //Index + log.trace("Adding entire document {}", documentId); + final Map<String, Object> source = getNewDocument(mutation.getAdditions(), + information.get(storeName)); + requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, storeName, + documentId, source)); + } else { + final Map upsert; + if (!mutation.hasDeletions()) { + upsert = getNewDocument(mutation.getAdditions(), information.get(storeName)); + } else { + upsert = null; + } + + List<Map<String, Object>> params = getParameters(information.get(storeName), + mutation.getAdditions(), false, Cardinality.SINGLE); + if (!params.isEmpty()) { + ImmutableMap.Builder builder = compat.prepareStoredScript(parameterizedAdditionScriptId, params); + requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName, + documentId, builder, upsert)); + log.trace("Adding script {} with params {}", PARAMETERIZED_ADDITION_SCRIPT, params); + } + + final Map<String, Object> doc = getAdditionDoc(information, storeName, mutation); + if (!doc.isEmpty()) { + final ImmutableMap.Builder builder = ImmutableMap.builder().put(ES_DOC_KEY, doc); + requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName, + documentId, builder, upsert)); + log.trace("Adding update {}", doc); + } + } + } + } + if (!requestByStore.isEmpty() && ingestPipelines.containsKey(storeName)) { + client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(storeName))); + } else if (!requestByStore.isEmpty()) { + requests.addAll(requestByStore); + } + } + if (!requests.isEmpty()) { + client.bulkRequest(requests, null); + } + } catch (final Exception e) { + log.error("Failed to execute bulk Elasticsearch mutation", e); + throw convert(e); + } + } + + private List<Map<String, Object>> getParameters(KeyInformation.StoreRetriever storeRetriever, + List<IndexEntry> entries, + boolean deletion, + Cardinality... cardinalitiesToSkip) { + Set<Cardinality> cardinalityToSkipSet = Sets.newHashSet(cardinalitiesToSkip); + List<Map<String, Object>> result = new ArrayList<>(); + for (IndexEntry entry : entries) { + KeyInformation info = storeRetriever.get(entry.field); + if (cardinalityToSkipSet.contains(info.getCardinality())) { + continue; + } + Object jsValue = deletion && info.getCardinality() == Cardinality.SINGLE ? + "" : convertToEsType(entry.value, Mapping.getMapping(info)); + result.add(ImmutableMap.of("name", entry.field, + "value", jsValue, + "cardinality", info.getCardinality().name())); + if (hasDualStringMapping(info)) { + result.add(ImmutableMap.of("name", getDualMappingName(entry.field), + "value", jsValue, + "cardinality", info.getCardinality().name())); + } + } + return result; + } + + private Map<String,Object> getAdditionDoc(KeyInformation.IndexRetriever information, + String store, IndexMutation mutation) throws PermanentBackendException { + final Map<String,Object> doc = new HashMap<>(); + for (final IndexEntry e : mutation.getAdditions()) { + final KeyInformation keyInformation = information.get(store).get(e.field); + if (keyInformation.getCardinality() == Cardinality.SINGLE) { + doc.put(e.field, convertToEsType(e.value, Mapping.getMapping(keyInformation))); + if (hasDualStringMapping(keyInformation)) { + doc.put(getDualMappingName(e.field), convertToEsType(e.value, Mapping.getMapping(keyInformation))); + } + } + } + + return doc; + } + + @Override + public void restore(Map<String,Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever information, + BaseTransaction tx) throws BackendException { + final List<ElasticSearchMutation> requests = new ArrayList<>(); + try { + for (final Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { + final List<ElasticSearchMutation> requestByStore = new ArrayList<>(); + final String store = stores.getKey(); + final String indexStoreName = getIndexStoreName(store); + for (final Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { + final String docID = entry.getKey(); + final List<IndexEntry> content = entry.getValue(); + if (content == null || content.size() == 0) { + // delete + if (log.isTraceEnabled()) + log.trace("Deleting entire document {}", docID); + + requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, store, docID)); + } else { + // Add + if (log.isTraceEnabled()) + log.trace("Adding entire document {}", docID); + final Map<String, Object> source = getNewDocument(content, information.get(store)); + requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, store, docID, + source)); + } + } + if (!requestByStore.isEmpty() && ingestPipelines.containsKey(store)) { + client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(store))); + } else if (!requestByStore.isEmpty()) { + requests.addAll(requestByStore); + } + } + if (!requests.isEmpty()) + client.bulkRequest(requests, null); + } catch (final Exception e) { + throw convert(e); + } + } + + private Map<String, Object> getRelationFromCmp(final Cmp cmp, String key, final Object value) { + switch (cmp) { + case EQUAL: + return compat.term(key, value); + case NOT_EQUAL: + return compat.boolMustNot(compat.term(key, value)); + case LESS_THAN: + return compat.lt(key, value); + case LESS_THAN_EQUAL: + return compat.lte(key, value); + case GREATER_THAN: + return compat.gt(key, value); + case GREATER_THAN_EQUAL: + return compat.gte(key, value); + default: + throw new IllegalArgumentException("Unexpected relation: " + cmp); + } + } + + public Map<String,Object> getFilter(Condition<?> condition, KeyInformation.StoreRetriever information) { + if (condition instanceof PredicateCondition) { + final PredicateCondition<String, ?> atom = (PredicateCondition) condition; + Object value = atom.getValue(); + final String key = atom.getKey(); + final JanusGraphPredicate predicate = atom.getPredicate(); + if (value == null && predicate == Cmp.NOT_EQUAL) { + return compat.exists(key); + } else if (value instanceof Number) { + Preconditions.checkArgument(predicate instanceof Cmp, + "Relation not supported on numeric types: %s", predicate); + return getRelationFromCmp((Cmp) predicate, key, value); + } else if (value instanceof String) { + + final Mapping mapping = getStringMapping(information.get(key)); + final String fieldName; + if (mapping==Mapping.TEXT && !(Text.HAS_CONTAINS.contains(predicate) || predicate instanceof Cmp)) + throw new IllegalArgumentException("Text mapped string values only support CONTAINS and Compare queries and not: " + predicate); + if (mapping==Mapping.STRING && Text.HAS_CONTAINS.contains(predicate)) + throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + predicate); + if (mapping==Mapping.TEXTSTRING && !(Text.HAS_CONTAINS.contains(predicate) || (predicate instanceof Cmp && predicate != Cmp.EQUAL))) { + fieldName = getDualMappingName(key); + } else { + fieldName = key; + } + + if (predicate == Text.CONTAINS || predicate == Cmp.EQUAL) { + return compat.match(fieldName, value); + } else if (predicate == Text.NOT_CONTAINS) { + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.match(fieldName, value)))); + } else if (predicate == Text.CONTAINS_PHRASE) { + return compat.matchPhrase(fieldName, value); + } else if (predicate == Text.NOT_CONTAINS_PHRASE) { + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.matchPhrase(fieldName, value)))); + } else if (predicate == Text.CONTAINS_PREFIX) { + if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters())) + value = ((String) value).toLowerCase(); + return compat.prefix(fieldName, value); + } else if (predicate == Text.NOT_CONTAINS_PREFIX) { + if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters())) + value = ((String) value).toLowerCase(); + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.prefix(fieldName, value)))); + } else if (predicate == Text.CONTAINS_REGEX) { + if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters())) + value = ((String) value).toLowerCase(); + return compat.regexp(fieldName, value); + } else if (predicate == Text.NOT_CONTAINS_REGEX) { + if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters())) + value = ((String) value).toLowerCase(); + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.regexp(fieldName, value)))); + } else if (predicate == Text.PREFIX) { + return compat.prefix(fieldName, value); + } else if (predicate == Text.NOT_PREFIX) { + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.prefix(fieldName, value)))); + } else if (predicate == Text.REGEX) { + return compat.regexp(fieldName, value); + } else if (predicate == Text.NOT_REGEX) { + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.regexp(fieldName, value)))); + } else if (predicate == Cmp.NOT_EQUAL) { + return compat.boolMustNot(compat.match(fieldName, value)); + } else if (predicate == Text.FUZZY || predicate == Text.CONTAINS_FUZZY) { + return compat.fuzzyMatch(fieldName, value); + } else if (predicate == Text.NOT_FUZZY || predicate == Text.NOT_CONTAINS_FUZZY) { + return compat.boolMust(ImmutableList.of(compat.exists(fieldName), + compat.boolMustNot(compat.fuzzyMatch(fieldName, value)))); + } else if (predicate == Cmp.LESS_THAN) { + return compat.lt(fieldName, value); + } else if (predicate == Cmp.LESS_THAN_EQUAL) { + return compat.lte(fieldName, value); + } else if (predicate == Cmp.GREATER_THAN) { + return compat.gt(fieldName, value); + } else if (predicate == Cmp.GREATER_THAN_EQUAL) { + return compat.gte(fieldName, value); + } else + throw new IllegalArgumentException("Predicate is not supported for string value: " + predicate); + } else if (value instanceof Geoshape && Mapping.getMapping(information.get(key)) == Mapping.DEFAULT) { + // geopoint + final Geoshape shape = (Geoshape) value; + Preconditions.checkArgument(predicate instanceof Geo && predicate != Geo.CONTAINS, + "Relation not supported on geopoint types: %s", predicate); + + final Map<String,Object> query; + switch (shape.getType()) { + case CIRCLE: + final Geoshape.Point center = shape.getPoint(); + query = compat.geoDistance(key, center.getLatitude(), center.getLongitude(), shape.getRadius()); + break; + case BOX: + final Geoshape.Point southwest = shape.getPoint(0); + final Geoshape.Point northeast = shape.getPoint(1); + query = compat.geoBoundingBox(key, southwest.getLatitude(), southwest.getLongitude(), + northeast.getLatitude(), northeast.getLongitude()); + break; + case POLYGON: + final List<List<Double>> points = IntStream.range(0, shape.size()) + .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(), + shape.getPoint(i).getLatitude())) + .collect(Collectors.toList()); + query = compat.geoPolygon(key, points); + break; + default: + throw new IllegalArgumentException("Unsupported or invalid search shape type for geopoint: " + + shape.getType()); + } + + return predicate == Geo.DISJOINT ? compat.boolMustNot(query) : query; + } else if (value instanceof Geoshape) { + Preconditions.checkArgument(predicate instanceof Geo, + "Relation not supported on geoshape types: %s", predicate); + final Geoshape shape = (Geoshape) value; + final Map<String,Object> geo; + switch (shape.getType()) { + case CIRCLE: + final Geoshape.Point center = shape.getPoint(); + geo = ImmutableMap.of(ES_TYPE_KEY, "circle", + ES_GEO_COORDS_KEY, ImmutableList.of(center.getLongitude(), center.getLatitude()), + "radius", shape.getRadius() + "km"); + break; + case BOX: + final Geoshape.Point southwest = shape.getPoint(0); + final Geoshape.Point northeast = shape.getPoint(1); + geo = ImmutableMap.of(ES_TYPE_KEY, "envelope", + ES_GEO_COORDS_KEY, + ImmutableList.of( + ImmutableList.of(southwest.getLongitude(), northeast.getLatitude()), + ImmutableList.of(northeast.getLongitude(), southwest.getLatitude()))); + break; + case LINE: + final List lineCoords = IntStream.range(0, shape.size()) + .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(), + shape.getPoint(i).getLatitude())) + .collect(Collectors.toList()); + geo = ImmutableMap.of(ES_TYPE_KEY, "linestring", ES_GEO_COORDS_KEY, lineCoords); + break; + case POLYGON: + final List polyCoords = IntStream.range(0, shape.size()) + .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(), + shape.getPoint(i).getLatitude())) + .collect(Collectors.toList()); + geo = ImmutableMap.of(ES_TYPE_KEY, "polygon", ES_GEO_COORDS_KEY, + ImmutableList.of(polyCoords)); + break; + case POINT: + geo = ImmutableMap.of(ES_TYPE_KEY, "point", + ES_GEO_COORDS_KEY, ImmutableList.of(shape.getPoint().getLongitude(), + shape.getPoint().getLatitude())); + break; + default: + throw new IllegalArgumentException("Unsupported or invalid search shape type: " + + shape.getType()); + } + + return compat.geoShape(key, geo, (Geo) predicate); + } else if (value instanceof Date || value instanceof Instant) { + Preconditions.checkArgument(predicate instanceof Cmp, + "Relation not supported on date types: %s", predicate); + + if (value instanceof Instant) { + value = Date.from((Instant) value); + } + return getRelationFromCmp((Cmp) predicate, key, value); + } else if (value instanceof Boolean) { + final Cmp numRel = (Cmp) predicate; + switch (numRel) { + case EQUAL: + return compat.term(key, value); + case NOT_EQUAL: + return compat.boolMustNot(compat.term(key, value)); + default: + throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); + } + } else if (value instanceof UUID) { + if (predicate == Cmp.EQUAL) { + return compat.term(key, value); + } else if (predicate == Cmp.NOT_EQUAL) { + return compat.boolMustNot(compat.term(key, value)); + } else { + throw new IllegalArgumentException("Only equal or not equal is supported for UUIDs: " + + predicate); + } + } else throw new IllegalArgumentException("Unsupported type: " + value); + } else if (condition instanceof Not) { + return compat.boolMustNot(getFilter(((Not) condition).getChild(),information)); + } else if (condition instanceof And) { + final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false) + .map(c -> getFilter(c,information)).collect(Collectors.toList()); + return compat.boolMust(queries); + } else if (condition instanceof Or) { + final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false) + .map(c -> getFilter(c,information)).collect(Collectors.toList()); + return compat.boolShould(queries); + } else throw new IllegalArgumentException("Invalid condition: " + condition); + } + + @Override + public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, + BaseTransaction tx) throws BackendException { + final ElasticSearchRequest sr = new ElasticSearchRequest(); + final Map<String,Object> esQuery = getFilter(query.getCondition(), informations.get(query.getStore())); + sr.setQuery(compat.prepareQuery(esQuery)); + if (!query.getOrder().isEmpty()) { + addOrderToQuery(informations, sr, query.getOrder(), query.getStore()); + } + sr.setFrom(0); + if (query.hasLimit()) { + sr.setSize(Math.min(query.getLimit(), batchSize)); + } else { + sr.setSize(batchSize); + } + + sr.setDisableSourceRetrieval(true); + + ElasticSearchResponse response; + try { + final String indexStoreName = getIndexStoreName(query.getStore()); + final boolean useScroll = sr.getSize() >= batchSize; + response = client.search(indexStoreName, + compat.createRequestBody(sr, useScroll? NULL_PARAMETERS : TRACK_TOTAL_HITS_DISABLED_PARAMETERS), + useScroll); + log.debug("First Executed query [{}] in {} ms", query.getCondition(), response.getTook()); + final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, sr.getSize()); + final Stream<RawQuery.Result<String>> toReturn + = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false); + return (query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn).map(RawQuery.Result::getResult); + } catch (final IOException | UncheckedIOException e) { + throw new PermanentBackendException(e); + } + } + + private Iterator<RawQuery.Result<String>> getResultsIterator(boolean useScroll, ElasticSearchResponse response, int windowSize){ + return (useScroll)? new ElasticSearchScroll(client, response, windowSize) : response.getResults().iterator(); + } + + private String convertToEsDataType(Class<?> dataType, Mapping mapping) { + if(String.class.isAssignableFrom(dataType)) { + return "string"; + } + else if (Integer.class.isAssignableFrom(dataType)) { + return "integer"; + } + else if (Long.class.isAssignableFrom(dataType)) { + return "long"; + } + else if (Float.class.isAssignableFrom(dataType)) { + return "float"; + } + else if (Double.class.isAssignableFrom(dataType)) { + return "double"; + } + else if (Boolean.class.isAssignableFrom(dataType)) { + return "boolean"; + } + else if (Date.class.isAssignableFrom(dataType)) { + return "date"; + } + else if (Instant.class.isAssignableFrom(dataType)) { + return "date"; + } + else if (Geoshape.class.isAssignableFrom(dataType)) { + return mapping == Mapping.DEFAULT ? "geo_point" : "geo_shape"; + } + + return null; + } + + private ElasticSearchResponse runCommonQuery(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx, int size, + boolean useScroll) throws BackendException{ + final ElasticSearchRequest sr = new ElasticSearchRequest(); + sr.setQuery(compat.queryString(query.getQuery())); + if (!query.getOrders().isEmpty()) { + addOrderToQuery(informations, sr, query.getOrders(), query.getStore()); + } + sr.setFrom(0); + sr.setSize(size); + sr.setDisableSourceRetrieval(true); + try { + Map<String, Object> requestBody = compat.createRequestBody(sr, query.getParameters()); + if(!useScroll) { + if (requestBody == null) { + requestBody = TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY; + } else { + requestBody.put(TRACK_TOTAL_HITS_PARAMETER, false); + } + } + return client.search( + getIndexStoreName(query.getStore()), + requestBody, + useScroll); + } catch (final IOException | UncheckedIOException e) { + throw new PermanentBackendException(e); + } + } + + private long runCountQuery(RawQuery query) throws BackendException{ + try { + return client.countTotal( + getIndexStoreName(query.getStore()), + compat.createRequestBody(compat.queryString(query.getQuery()), query.getParameters())); + } catch (final IOException | UncheckedIOException e) { + throw new PermanentBackendException(e); + } + } + + private void addOrderToQuery(KeyInformation.IndexRetriever informations, ElasticSearchRequest sr, final List<IndexQuery.OrderEntry> orders, + String store) { + for (final IndexQuery.OrderEntry orderEntry : orders) { + final String order = orderEntry.getOrder().name(); + final KeyInformation information = informations.get(store).get(orderEntry.getKey()); + final Mapping mapping = Mapping.getMapping(information); + final Class<?> datatype = orderEntry.getDatatype(); + final String key = hasDualStringMapping(information) ? getDualMappingName(orderEntry.getKey()) : orderEntry.getKey(); + sr.addSort(key, order.toLowerCase(), convertToEsDataType(datatype, mapping)); + } + } + + @Override + public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever information, + BaseTransaction tx) throws BackendException { + final int size = query.hasLimit() ? Math.min(query.getLimit() + query.getOffset(), batchSize) : batchSize; + final boolean useScroll = size >= batchSize; + final ElasticSearchResponse response = runCommonQuery(query, information, tx, size, useScroll); + log.debug("First Executed query [{}] in {} ms", query.getQuery(), response.getTook()); + final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, size); + final Stream<RawQuery.Result<String>> toReturn + = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), + false).skip(query.getOffset()); + return query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn; + } + + @Override + public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException { + final ElasticSearchRequest sr = new ElasticSearchRequest(); + final Map<String,Object> esQuery = getFilter(query.getCondition(), information.get(query.getStore())); + sr.setQuery(compat.prepareQuery(esQuery)); + try { + return client.countTotal( + getIndexStoreName(query.getStore()), + compat.createRequestBody(sr, null)); + } catch (final IOException | UncheckedIOException e) { + throw new PermanentBackendException(e); + } + } + + @Override + public Long totals(RawQuery query, KeyInformation.IndexRetriever information, + BaseTransaction tx) throws BackendException { + long startTime = System.currentTimeMillis(); + long count = runCountQuery(query); + if(log.isDebugEnabled()){ + log.debug("Executed count query [{}] in {} ms", query.getQuery(), System.currentTimeMillis() - startTime); + } + return count; + } + + @Override + public boolean supports(KeyInformation information, JanusGraphPredicate janusgraphPredicate) { + final Class<?> dataType = information.getDataType(); + final Mapping mapping = Mapping.getMapping(information); + if (mapping!=Mapping.DEFAULT && !AttributeUtils.isString(dataType) && + !(mapping==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType))) return false; + + if (Number.class.isAssignableFrom(dataType)) { + return janusgraphPredicate instanceof Cmp; + } else if (dataType == Geoshape.class) { + switch(mapping) { + case DEFAULT: + return janusgraphPredicate instanceof Geo && janusgraphPredicate != Geo.CONTAINS; + case PREFIX_TREE: + return janusgraphPredicate instanceof Geo; + } + } else if (AttributeUtils.isString(dataType)) { + switch(mapping) { + case DEFAULT: + case TEXT: + return janusgraphPredicate == Text.CONTAINS || janusgraphPredicate == Text.NOT_CONTAINS + || janusgraphPredicate == Text.CONTAINS_FUZZY || janusgraphPredicate == Text.NOT_CONTAINS_FUZZY + || janusgraphPredicate == Text.CONTAINS_PREFIX || janusgraphPredicate == Text.NOT_CONTAINS_PREFIX + || janusgraphPredicate == Text.CONTAINS_REGEX || janusgraphPredicate == Text.NOT_CONTAINS_REGEX + || janusgraphPredicate == Text.CONTAINS_PHRASE || janusgraphPredicate == Text.NOT_CONTAINS_PHRASE; + case STRING: + return janusgraphPredicate instanceof Cmp + || janusgraphPredicate==Text.REGEX || janusgraphPredicate==Text.NOT_REGEX + || janusgraphPredicate==Text.PREFIX || janusgraphPredicate==Text.NOT_PREFIX + || janusgraphPredicate == Text.FUZZY || janusgraphPredicate == Text.NOT_FUZZY; + case TEXTSTRING: + return janusgraphPredicate instanceof Text || janusgraphPredicate instanceof Cmp; + } + } else if (dataType == Date.class || dataType == Instant.class) { + return janusgraphPredicate instanceof Cmp; + } else if (dataType == Boolean.class) { + return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate == Cmp.NOT_EQUAL; + } else if (dataType == UUID.class) { + return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate==Cmp.NOT_EQUAL; + } + return false; + } + + + @Override + public boolean supports(KeyInformation information) { + final Class<?> dataType = information.getDataType(); + final Mapping mapping = Mapping.getMapping(information); + if (Number.class.isAssignableFrom(dataType) || dataType == Date.class || dataType== Instant.class + || dataType == Boolean.class || dataType == UUID.class) { + return mapping == Mapping.DEFAULT; + } else if (AttributeUtils.isString(dataType)) { + return mapping == Mapping.DEFAULT || mapping == Mapping.STRING + || mapping == Mapping.TEXT || mapping == Mapping.TEXTSTRING; + } else if (AttributeUtils.isGeo(dataType)) { + return mapping == Mapping.DEFAULT || mapping == Mapping.PREFIX_TREE; + } + return false; + } + + @Override + public String mapKey2Field(String key, KeyInformation information) { + IndexProvider.checkKeyValidity(key); + return key.replace(' ', IndexProvider.REPLACEMENT_CHAR); + } + + @Override + public IndexFeatures getFeatures() { + return compat.getIndexFeatures(); + } + + @Override + public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { + return new DefaultTransaction(config); + } + + @Override + public void close() throws BackendException { + try { + client.close(); + } catch (final IOException e) { + throw new PermanentBackendException(e); + } + + } + + @Override + public void clearStorage() throws BackendException { + try { + client.deleteIndex(indexName); + } catch (final Exception e) { + throw new PermanentBackendException("Could not delete index " + indexName, e); + } finally { + close(); + } + } + + @Override + public boolean exists() throws BackendException { + try { + return client.indexExists(indexName); + } catch (final IOException e) { + throw new PermanentBackendException("Could not check if index " + indexName + " exists", e); + } + } + + ElasticMajorVersion getVersion() { + return client.getMajorVersion(); + } + + boolean isUseMappingForES7(){ + return useMappingForES7; + } + + private static String parameterizedScriptPrepare(String ... lines){ + return Arrays.stream(lines).map(String::trim).collect(Collectors.joining("")); + } +} \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java index 4a37c77..f83dff6 100644 --- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java +++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java @@ -50,14 +50,17 @@ public final class ApplicationProperties extends PropertiesConfiguration { public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend"; public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name"; public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher"; + public static final String ELASTICSEARCH_INDEX_NAME_CONF = "atlas.graph.index.search.elasticsearch.index-name"; public static final String INDEX_RECOVERY_CONF = "atlas.index.recovery.enable"; public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable"; public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable"; public static final String ATLAS_RUN_MODE = "atlas.run.mode"; public static final String GRAPHBD_BACKEND_JANUS = "janus"; + public static final String DEFAULT_INDEX_NAME = "janusgraph"; public static final String STORAGE_BACKEND_HBASE = "hbase"; public static final String STORAGE_BACKEND_HBASE2 = "hbase2"; public static final String INDEX_BACKEND_SOLR = "solr"; + public static final String INDEX_BACKEND_ELASTICSEARCH = "elasticsearch"; public static final String LDAP_TYPE = "atlas.authentication.method.ldap.type"; public static final String LDAP = "LDAP"; public static final String AD = "AD"; @@ -355,6 +358,10 @@ public final class ApplicationProperties extends PropertiesConfiguration { addPropertyDirect(INDEX_MAP_NAME_CONF, DEFAULT_INDEX_MAP_NAME); LOG.info("Setting index.search.map-name property '" + DEFAULT_INDEX_MAP_NAME + "'"); } + } else if (indexBackend.equalsIgnoreCase(INDEX_BACKEND_ELASTICSEARCH)){ + addPropertyDirect(ELASTICSEARCH_INDEX_NAME_CONF, DEFAULT_INDEX_NAME); + + LOG.info("Setting elasticsearch.index-name property '" + DEFAULT_INDEX_NAME + "'"); } // setting value for 'atlas.graph.index.search.max-result-set-size' (default = 500000) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java index b316354..57964d5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java @@ -166,13 +166,13 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { while (shouldRun.get()) { try { - boolean solrHealthy = isSolrHealthy(); + boolean isIdxHealthy = isIndexBackendHealthy(); - if (this.txRecoveryObject == null && solrHealthy) { + if (this.txRecoveryObject == null && isIdxHealthy) { startMonitoring(); } - if (this.txRecoveryObject != null && !solrHealthy) { + if (this.txRecoveryObject != null && !isIdxHealthy) { stopMonitoring(); } } catch (Exception e) { @@ -197,7 +197,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } } - private boolean isSolrHealthy() throws AtlasException, InterruptedException { + private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException { Thread.sleep(indexStatusCheckRetryMillis); return this.graph.getGraphIndexClient().isHealthy();