METAMODEL-1179: Upgraded ElasticSearch REST module to new client. Using the official elastic REST high level client for ElasticSearch.
Closes #177 Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/bda8d764 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/bda8d764 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/bda8d764 Branch: refs/heads/master Commit: bda8d764f65acdf3b1f520d21cb51a7396e23c7d Parents: c57d508 Author: Arjan Seijkens <[email protected]> Authored: Thu Jan 25 20:13:08 2018 -0800 Committer: Kasper Sørensen <[email protected]> Committed: Thu Jan 25 20:13:08 2018 -0800 ---------------------------------------------------------------------- .travis.yml | 5 +- CHANGES.md | 3 +- .../AbstractElasticSearchDataContext.java | 140 +++++ .../AbstractElasticSearchDataSet.java | 123 ++++ .../common/ElasticSearchDateConverter.java | 17 +- .../common/ElasticSearchMetaDataParser.java | 79 +++ .../common/ElasticSearchUtils.java | 80 +-- .../common/ElasticSearchUtilsTest.java | 63 ++ elasticsearch/native/pom.xml | 36 ++ .../ElasticSearchCreateTableBuilder.java | 8 +- .../nativeclient/ElasticSearchDataContext.java | 171 ++---- .../ElasticSearchDataContextFactory.java | 47 +- .../nativeclient/ElasticSearchDataSet.java | 90 +-- .../ElasticSearchDeleteBuilder.java | 27 +- .../ElasticSearchDropTableBuilder.java | 103 ---- .../ElasticSearchInsertBuilder.java | 8 +- .../ElasticSearchMetaDataParser.java | 81 --- .../ElasticSearchUpdateBuilder.java | 116 ++++ .../ElasticSearchUpdateCallback.java | 10 +- .../nativeclient/NativeElasticSearchUtils.java | 67 -- .../ElasticSearchDataContextTest.java | 231 +++---- .../ElasticSearchMetaDataParserTest.java | 1 + .../nativeclient/ElasticSearchUtilsTest.java | 63 -- .../utils/EmbeddedElasticsearchServer.java | 71 --- elasticsearch/pom.xml | 2 +- elasticsearch/rest/pom.xml | 114 +++- .../rest/ElasticSearchRestClient.java | 134 ++++ .../ElasticSearchRestCreateTableBuilder.java | 55 ++ .../rest/ElasticSearchRestDataContext.java | 226 ++----- .../ElasticSearchRestDataContextFactory.java | 46 +- .../rest/ElasticSearchRestDataSet.java | 65 ++ .../rest/ElasticSearchRestDeleteBuilder.java | 96 +++ .../rest/ElasticSearchRestInsertBuilder.java | 72 +++ .../rest/ElasticSearchRestUpdateCallback.java | 167 +++++ .../elasticsearch/rest/JestClientExecutor.java | 51 -- .../elasticsearch/rest/JestDeleteScroll.java | 57 -- .../JestElasticSearchCreateTableBuilder.java | 56 -- .../rest/JestElasticSearchDataSet.java | 124 ---- .../rest/JestElasticSearchDeleteBuilder.java | 76 --- .../rest/JestElasticSearchDropTableBuilder.java | 62 -- .../rest/JestElasticSearchInsertBuilder.java | 74 --- .../rest/JestElasticSearchMetaDataParser.java | 75 --- .../rest/JestElasticSearchUpdateCallback.java | 164 ----- .../rest/JestElasticSearchUtils.java | 90 --- .../ElasticSearchRestDataContexFactoryIT.java | 131 ++++ .../rest/ElasticSearchRestDataContextIT.java | 535 ++++++++++++++++ .../rest/JestElasticSearchDataContextTest.java | 615 ------------------- .../JestElasticSearchMetaDataParserTest.java | 70 --- .../rest/JestElasticSearchUtilsTest.java | 188 ------ .../rest/utils/EmbeddedElasticsearchServer.java | 72 --- .../rest/src/test/resources/Dockerfile | 5 + .../rest/src/test/resources/elasticsearch.yml | 13 + .../apache/metamodel/DataContextFactory.java | 6 +- hbase/pom.xml | 4 + pom.xml | 1 + 55 files changed, 2276 insertions(+), 2810 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 1b1e342..65df979 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,10 @@ before_install: services: - couchdb - mongodb - + - docker + +script: "mvn clean verify -P integration-test" + after_success: - mvn test javadoc:javadoc http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/CHANGES.md ---------------------------------------------------------------------- diff --git a/CHANGES.md b/CHANGES.md index 5af3fc4..7ab1d22 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,6 @@ -### WIP +### Apache MetaModel 5.1.0 (WIP) + * [METAMODEL-1179] - Refactored ElasticSearch REST module to use new official REST based client from Elastic. * [METAMODEL-1177] - Made TableType.TABLE the default table type, replacing null values. ### Apache MetaModel 5.0.1 http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java new file mode 100644 index 0000000..c8ffae3 --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.metamodel.DataContext; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.QueryPostprocessDataContext; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.MutableColumn; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.util.SimpleTableDef; +import org.elasticsearch.common.unit.TimeValue; + +public abstract class AbstractElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, + UpdateableDataContext { + + public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60); + + protected final String indexName; + + // Table definitions that are set from the beginning, not supposed to be + // changed. + protected final List<SimpleTableDef> staticTableDefinitions; + + // Table definitions that are discovered, these can change + protected final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); + + /** + * Constructs a {@link ElasticSearchRestDataContext}. This constructor + * accepts a custom array of {@link SimpleTableDef}s which allows the user + * to define his own view on the indexes in the engine. + * + * @param indexName + * the name of the ElasticSearch index to represent + * @param tableDefinitions + * an array of {@link SimpleTableDef}s, which define the table + * and column model of the ElasticSearch index. + */ + public AbstractElasticSearchDataContext(String indexName, SimpleTableDef... tableDefinitions) { + super(false); + if (indexName == null || indexName.trim().length() == 0) { + throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); + } + this.indexName = indexName; + this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections + .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions)); + } + + /** + * Performs an analysis of the available indexes in an ElasticSearch cluster + * {@link JestClient} instance and detects the elasticsearch types structure + * based on the metadata provided by the ElasticSearch java client. + * + * @see {@link #detectTable(JsonObject, String)} + * @return a mutable schema instance, useful for further fine tuning by the + * user. + */ + protected abstract SimpleTableDef[] detectSchema(); + + @Override + protected Schema getMainSchema() throws MetaModelException { + final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); + for (final SimpleTableDef tableDef : staticTableDefinitions) { + addTable(theSchema, tableDef); + } + + final SimpleTableDef[] tables = detectSchema(); + synchronized (this) { + dynamicTableDefinitions.clear(); + dynamicTableDefinitions.addAll(Arrays.asList(tables)); + for (final SimpleTableDef tableDef : dynamicTableDefinitions) { + final List<String> tableNames = theSchema.getTableNames(); + + if (!tableNames.contains(tableDef.getName())) { + addTable(theSchema, tableDef); + } + } + } + + return theSchema; + } + + private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { + final MutableTable table = tableDef.toTable().setSchema(theSchema); + final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID); + if (idColumn != null && idColumn instanceof MutableColumn) { + final MutableColumn mutableColumn = (MutableColumn) idColumn; + mutableColumn.setPrimaryKey(true); + } + theSchema.addTable(table); + } + + @Override + protected String getMainSchemaName() throws MetaModelException { + return indexName; + } + + /** + * Gets the name of the index that this {@link DataContext} is working on. + */ + public String getIndexName() { + return indexName; + } + + protected boolean limitMaxRowsIsSet(int maxRows) { + return (maxRows != -1); + } + + protected static SimpleTableDef[] sortTables(final List<SimpleTableDef> result) { + final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); + Arrays.sort(tableDefArray, (o1, o2) -> o1.getName().compareTo(o2.getName())); + return tableDefArray; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java new file mode 100644 index 0000000..fea2190 --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.metamodel.data.AbstractDataSet; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.SelectItem; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link DataSet} implementation for ElasticSearch + */ +public abstract class AbstractElasticSearchDataSet extends AbstractDataSet { + + private static final Logger logger = LoggerFactory.getLogger(AbstractElasticSearchDataSet.class); + + protected final AtomicBoolean _closed; + + protected SearchResponse _searchResponse; + protected SearchHit _currentHit; + protected int _hitIndex = 0; + + public AbstractElasticSearchDataSet(final SearchResponse searchResponse, final List<SelectItem> selectItems) { + super(selectItems); + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + + @Override + public void close() { + super.close(); + boolean closeNow = _closed.compareAndSet(true, false); + if (closeNow) { + closeNow(); + } + } + + protected abstract void closeNow(); + + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (!_closed.get()) { + logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); + close(); + } + } + + @Override + public boolean next() { + final SearchHit[] hits = _searchResponse.getHits().getHits(); + if (hits.length == 0) { + // break condition for the scroll + _currentHit = null; + return false; + } + + if (_hitIndex < hits.length) { + // pick the next hit within this search response + _currentHit = hits[_hitIndex]; + _hitIndex++; + return true; + } + + final String scrollId = _searchResponse.getScrollId(); + if (scrollId == null) { + // this search response is not scrolleable - then it's the end. + _currentHit = null; + return false; + } + + // try to scroll to the next set of hits + try { + _searchResponse = scrollSearchResponse(scrollId); + } catch (IOException e) { + logger.warn("Failed to scroll to the next search response set.", e); + return false; + } + + // start over (recursively) + _hitIndex = 0; + return next(); + } + + protected abstract SearchResponse scrollSearchResponse(final String scrollId) throws IOException; + + @Override + public Row getRow() { + if (_currentHit == null) { + return null; + } + + final Map<String, Object> source = _currentHit.getSource(); + final String documentId = _currentHit.getId(); + return ElasticSearchUtils.createRow(source, documentId, getHeader()); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java index a6ce656..652fbe6 100644 --- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java @@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.common; import org.apache.metamodel.util.TimeComparator; +import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; @@ -30,12 +31,22 @@ import java.util.Date; */ public final class ElasticSearchDateConverter { + private static final DateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + private static final DateFormat FALLBACK_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + public static Date tryToConvert(String dateAsString) { + if (dateAsString == null) { + return null; + } + try { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); - return dateFormat.parse(dateAsString); + return DEFAULT_DATE_FORMAT.parse(dateAsString); } catch (ParseException e) { - return TimeComparator.toDate(dateAsString); + try { + return FALLBACK_DATE_FORMAT.parse(dateAsString); + } catch (ParseException e1) { + return TimeComparator.toDate(dateAsString); + } } } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java new file mode 100644 index 0000000..32f07ff --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.metamodel.schema.ColumnType; + +/** + * Parser that transforms the ElasticSearch metadata response (json-like format) + * into an ElasticSearchMetaData object. + */ +public class ElasticSearchMetaDataParser { + + /** + * Parses the ElasticSearch meta data info into an ElasticSearchMetaData + * object. This method makes much easier to create the ElasticSearch schema. + * + * @param metaDataInfo + * ElasticSearch mapping metadata in Map format + * @return An ElasticSearchMetaData object + */ + public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) { + final String[] fieldNames = new String[metaDataInfo.size() + 1]; + final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1]; + + // add the document ID field (fixed) + fieldNames[0] = ElasticSearchUtils.FIELD_ID; + columnTypes[0] = ColumnType.STRING; + + int i = 1; + for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) { + @SuppressWarnings("unchecked") + final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue(); + + fieldNames[i] = metaDataField.getKey(); + columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata); + i++; + + } + return new ElasticSearchMetaData(fieldNames, columnTypes); + } + + private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) { + final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata); + + if (metaDataFieldType == null) { + return ColumnType.STRING; + } + + return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType); + } + + private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) { + final Object type = metaDataField.get("type"); + if (type == null) { + return null; + } + return type.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java index b298d11..9128182 100644 --- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java @@ -18,72 +18,43 @@ */ package org.apache.metamodel.elasticsearch.common; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.DefaultRow; +import org.apache.metamodel.data.Row; import org.apache.metamodel.query.FilterItem; import org.apache.metamodel.query.LogicalOperator; import org.apache.metamodel.query.OperatorType; +import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.ColumnType; import org.apache.metamodel.schema.MutableColumn; import org.apache.metamodel.schema.MutableTable; import org.apache.metamodel.util.CollectionUtils; -import org.elasticsearch.common.base.Strings; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ElasticSearchUtils { - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class); - public static final String FIELD_ID = "_id"; public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars"; - /** - * Gets a "filter" query which is both 1.x and 2.x compatible. - */ - private static QueryBuilder getFilteredQuery(String prefix, String fieldName) { - // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, - // FilterBuilders.missingFilter(fieldName)); - // 2.x: itemQueryBuilder = - // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName)); - try { - try { - Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class); - method.setAccessible(true); - return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName)); - } catch (NoSuchMethodException e) { - Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass( - "org.elasticsearch.index.query.FilterBuilders"); - Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class); - filterBuilderMethod.setAccessible(true); - Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery", - QueryBuilder.class, FilterBuilder.class); - return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke( - null, fieldName)); - } - } catch (Exception e) { - logger.error("Failed to resolve/invoke filtering method", e); - throw new IllegalStateException("Failed to resolve filtering method", e); - } - } - public static QueryBuilder getMissingQuery(String fieldName) { - return getFilteredQuery("missing", fieldName); + return new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(fieldName)); } public static QueryBuilder getExistsQuery(String fieldName) { - return getFilteredQuery("exists", fieldName); + return new ExistsQueryBuilder(fieldName); } public static Map<String, ?> getMappingSource(final MutableTable table) { @@ -170,7 +141,7 @@ public class ElasticSearchUtils { } if (type.isLiteral()) { - return "string"; + return "text"; } else if (type == ColumnType.FLOAT) { return "float"; } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) { @@ -294,4 +265,35 @@ public class ElasticSearchUtils { } return columnType; } + + public static Row createRow(final Map<String, Object> sourceMap, final String documentId, final DataSetHeader header) { + final Object[] values = new Object[header.size()]; + for (int i = 0; i < values.length; i++) { + final SelectItem selectItem = header.getSelectItem(i); + final Column column = selectItem.getColumn(); + + assert column != null; + assert selectItem.getAggregateFunction() == null; + assert selectItem.getScalarFunction() == null; + + if (column.isPrimaryKey()) { + values[i] = documentId; + } else { + Object value = sourceMap.get(column.getName()); + + if (column.getType() == ColumnType.DATE) { + Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value); + if (valueToDate == null) { + values[i] = value; + } else { + values[i] = valueToDate; + } + } else { + values[i] = value; + } + } + } + + return new DefaultRow(header, values); + } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java new file mode 100644 index 0000000..9fb7e03 --- /dev/null +++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import junit.framework.TestCase; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.MutableColumn; + +import java.util.*; + +public class ElasticSearchUtilsTest extends TestCase { + + public void testAssignDocumentIdForPrimaryKeys() throws Exception { + MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true); + SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn); + List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + Map<String, Object> values = new HashMap<>(); + values.put("value1", "theValue"); + Row row = ElasticSearchUtils.createRow(values, documentId, header); + String primaryKeyValue = (String) row.getValue(primaryKeyItem); + + assertEquals(primaryKeyValue, documentId); + } + + public void testCreateRowWithParsableDates() throws Exception { + SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING)); + SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE)); + List<SelectItem> selectItems1 = Arrays.asList(item1, item2); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + Map<String, Object> values = new HashMap<>(); + values.put("value1", "theValue"); + values.put("value2", "2013-01-04T15:55:51.217+01:00"); + Row row = ElasticSearchUtils.createRow(values, documentId, header); + Object stringValue = row.getValue(item1); + Object dateValue = row.getValue(item2); + + assertTrue(stringValue instanceof String); + assertTrue(dateValue instanceof Date); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml index 4c1abcf..6adca36 100644 --- a/elasticsearch/native/pom.xml +++ b/elasticsearch/native/pom.xml @@ -43,6 +43,17 @@ <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- test --> <dependency> @@ -53,7 +64,32 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> + <version>4.12</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.elasticsearch.test</groupId> + <artifactId>framework</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.9.1</version> + <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java index f27e8ac..4e5873c 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java @@ -27,6 +27,7 @@ import org.apache.metamodel.schema.MutableSchema; import org.apache.metamodel.schema.MutableTable; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.client.IndicesAdminClient; @@ -50,13 +51,16 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices(); final String indexName = dataContext.getIndexName(); - final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName) - .setType(table.getName()); + final PutMappingRequestBuilder requestBuilder = + new PutMappingRequestBuilder(indicesAdmin, PutMappingAction.INSTANCE).setIndices(indexName) + .setType(table.getName()); requestBuilder.setSource(source); final PutMappingResponse result = requestBuilder.execute().actionGet(); logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged()); + dataContext.getElasticSearchClient().admin().indices().prepareRefresh(indexName).get(); + final MutableSchema schema = (MutableSchema) getSchema(); schema.addTable(table); return table; http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java index d2dfe4b..3df0ce1 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java @@ -18,39 +18,30 @@ */ package org.apache.metamodel.elasticsearch.nativeclient; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.metamodel.DataContext; -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.QueryPostprocessDataContext; import org.apache.metamodel.UpdateScript; import org.apache.metamodel.UpdateSummary; -import org.apache.metamodel.UpdateableDataContext; import org.apache.metamodel.data.DataSet; import org.apache.metamodel.data.DataSetHeader; import org.apache.metamodel.data.Row; import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext; import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser; import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; import org.apache.metamodel.query.FilterItem; import org.apache.metamodel.query.LogicalOperator; import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.MutableColumn; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.MutableTable; -import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; import org.apache.metamodel.util.SimpleTableDef; -import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; -import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -59,14 +50,16 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.hppc.ObjectLookupContainer; -import org.elasticsearch.common.hppc.cursors.ObjectCursor; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.carrotsearch.hppc.ObjectLookupContainer; +import com.carrotsearch.hppc.cursors.ObjectCursor; + /** * DataContext implementation for ElasticSearch analytics engine. * @@ -84,20 +77,11 @@ import org.slf4j.LoggerFactory; * This implementation supports either automatic discovery of a schema or manual * specification of a schema, through the {@link SimpleTableDef} class. */ -public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext { +public class ElasticSearchDataContext extends AbstractElasticSearchDataContext { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class); - public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60); - private final Client elasticSearchClient; - private final String indexName; - // Table definitions that are set from the beginning, not supposed to be - // changed. - private final List<SimpleTableDef> staticTableDefinitions; - - // Table definitions that are discovered, these can change - private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); /** * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a @@ -113,16 +97,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem * and column model of the ElasticSearch index. */ public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) { - super(false); + super(indexName, tableDefinitions); + if (client == null) { throw new IllegalArgumentException("ElasticSearch Client cannot be null"); } - if (indexName == null || indexName.trim().length() == 0) { - throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); - } this.elasticSearchClient = client; - this.indexName = indexName; - this.staticTableDefinitions = Arrays.asList(tableDefinitions); this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); } @@ -140,40 +120,14 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem this(client, indexName, new SimpleTableDef[0]); } - /** - * Performs an analysis of the available indexes in an ElasticSearch cluster - * {@link Client} instance and detects the elasticsearch types structure - * based on the metadata provided by the ElasticSearch java client. - * - * @see {@link #detectTable(ClusterState, String, String)} - * @return a mutable schema instance, useful for further fine tuning by the - * user. - */ - private SimpleTableDef[] detectSchema() { + @Override + protected SimpleTableDef[] detectSchema() { logger.info("Detecting schema for index '{}'", indexName); - final ClusterState cs; final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster() - .prepareState(); - - // different methods here to set the index name, so we have to use - // reflection :-/ - try { - final byte majorVersion = Version.CURRENT.major; - final Object methodArgument = new String[] { indexName }; - if (majorVersion == 0) { - final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class); - method.invoke(clusterStateRequestBuilder, methodArgument); - } else { - final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class); - method.invoke(clusterStateRequestBuilder, methodArgument); - } - } catch (Exception e) { - logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e); - throw new MetaModelException("Failed to create request for index information needed to detect schema", e); - } - cs = clusterStateRequestBuilder.execute().actionGet().getState(); - + .prepareState().setIndices(indexName); + final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState(); + final List<SimpleTableDef> result = new ArrayList<>(); final IndexMetaData imd = cs.getMetaData().index(indexName); @@ -183,9 +137,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem } else { final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); final ObjectLookupContainer<String> documentTypes = mappings.keys(); - - for (final Object documentTypeCursor : documentTypes) { - final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString(); + for (final ObjectCursor<?> documentTypeCursor : documentTypes) { + final String documentType = documentTypeCursor.value.toString(); try { final SimpleTableDef table = detectTable(cs, indexName, documentType); result.add(table); @@ -194,15 +147,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem } } } - final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); - Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() { - @Override - public int compare(SimpleTableDef o1, SimpleTableDef o2) { - return o1.getName().compareTo(o2.getName()); - } - }); - - return tableDefArray; + return sortTables(result); } /** @@ -225,7 +170,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem // index does not exist throw new IllegalArgumentException("No such index: " + indexName); } - final MappingMetaData mappingMetaData = imd.mapping(documentType); + final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); + final MappingMetaData mappingMetaData = mappings.get(documentType); if (mappingMetaData == null) { throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType); } @@ -244,44 +190,6 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem } @Override - protected Schema getMainSchema() throws MetaModelException { - final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); - for (final SimpleTableDef tableDef : staticTableDefinitions) { - addTable(theSchema, tableDef); - } - - final SimpleTableDef[] tables = detectSchema(); - synchronized (this) { - dynamicTableDefinitions.clear(); - dynamicTableDefinitions.addAll(Arrays.asList(tables)); - for (final SimpleTableDef tableDef : dynamicTableDefinitions) { - final List<String> tableNames = theSchema.getTableNames(); - - if (!tableNames.contains(tableDef.getName())) { - addTable(theSchema, tableDef); - } - } - } - - return theSchema; - } - - private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { - final MutableTable table = tableDef.toTable().setSchema(theSchema); - final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID); - if (idColumn != null && idColumn instanceof MutableColumn) { - final MutableColumn mutableColumn = (MutableColumn) idColumn; - mutableColumn.setPrimaryKey(true); - } - theSchema.addTable(table); - } - - @Override - protected String getMainSchemaName() throws MetaModelException { - return indexName; - } - - @Override protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems, int firstRow, int maxRows) { final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, @@ -290,7 +198,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem // where clause can be pushed down to an ElasticSearch query final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder); final SearchResponse response = searchRequest.execute().actionGet(); - return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false); + return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems); } return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); } @@ -299,12 +207,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null); final SearchResponse response = searchRequest.execute().actionGet(); - return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false); + return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new) + .collect(Collectors.toList())); } private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) { final String documentType = table.getName(); - final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType); + final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType); if (firstRow > 1) { final int zeroBasedFrom = firstRow - 1; searchRequest.setFrom(zeroBasedFrom); @@ -332,7 +241,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem final String documentType = table.getName(); final String id = keyValue.toString(); - final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet(); + final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet(); if (!response.isExists()) { return null; @@ -343,7 +252,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem final DataSetHeader header = new SimpleDataSetHeader(selectItems); - return NativeElasticSearchUtils.createRow(source, documentId, header); + return ElasticSearchUtils.createRow(source, documentId, header); } @Override @@ -353,13 +262,11 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem return null; } final String documentType = table.getName(); - final CountResponse response = elasticSearchClient.prepareCount(indexName) - .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet(); - return response.getCount(); - } - - private boolean limitMaxRowsIsSet(int maxRows) { - return (maxRows != -1); + final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType); + final SearchResponse searchResponse = + getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query)) + .execute().actionGet(); + return searchResponse.getHits().getTotalHits(); } @Override @@ -370,6 +277,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem return callback.getUpdateSummary(); } + @Override + protected void onSchemaCacheRefreshed() { + getElasticSearchClient().admin().indices().prepareRefresh(indexName).get(); + + detectSchema(); + } + /** * Gets the {@link Client} that this {@link DataContext} is wrapping. * @@ -378,13 +292,4 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem public Client getElasticSearchClient() { return elasticSearchClient; } - - /** - * Gets the name of the index that this {@link DataContext} is working on. - * - * @return - */ - public String getIndexName() { - return indexName; - } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java index 94359c4..a6f6953 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java @@ -18,6 +18,9 @@ */ package org.apache.metamodel.elasticsearch.nativeclient; +import java.net.InetAddress; +import java.net.UnknownHostException; + import org.apache.metamodel.ConnectionException; import org.apache.metamodel.DataContext; import org.apache.metamodel.factory.DataContextFactory; @@ -27,12 +30,11 @@ import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException; import org.apache.metamodel.util.SimpleTableDef; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Factory for ElasticSearch data context of native type. @@ -59,6 +61,7 @@ import org.elasticsearch.node.NodeBuilder; * </ul> */ public class ElasticSearchDataContextFactory implements DataContextFactory { + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContextFactory.class); @Override public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) { @@ -120,46 +123,22 @@ public class ElasticSearchDataContextFactory implements DataContextFactory { @Override public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) throws UnsupportedDataContextPropertiesException, ConnectionException { - final String clientType = getClientType(properties); final Client client; - if ("node".equals(clientType)) { - client = createNodeClient(properties); - } else { client = createTransportClient(properties); - } final String indexName = getIndex(properties); final SimpleTableDef[] tableDefinitions = properties.getTableDefs(); return new ElasticSearchDataContext(client, indexName, tableDefinitions); } private Client createTransportClient(DataContextProperties properties) { - final Builder settingsBuilder = ImmutableSettings.builder(); - settingsBuilder.put("name", "MetaModel"); - settingsBuilder.put("cluster.name", getCluster(properties)); - if (properties.getUsername() != null && properties.getPassword() != null) { - settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword()); - if ("true".equals(properties.toMap().get("ssl"))) { - if (properties.toMap().get("keystorePath") != null) { - settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath")); - settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword")); - } - settingsBuilder.put("shield.transport.ssl", "true"); - } + final Settings settings = Settings.builder().put().put("name", "MetaModel").put("cluster.name", getCluster(properties)).build(); + final TransportClient client = new PreBuiltTransportClient(settings); + try { + client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(properties.getHostname()), properties.getPort())); + } catch (UnknownHostException e) { + logger.warn("no IP address for the host with name \"{}\" could be found.", properties.getHostname()); } - final Settings settings = settingsBuilder.build(); - - final TransportClient client = new TransportClient(settings); - client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort())); return client; } - private Client createNodeClient(DataContextProperties properties) { - final Builder settingsBuilder = ImmutableSettings.builder(); - settingsBuilder.put("name", "MetaModel"); - settingsBuilder.put("shield.enabled", false); - final Settings settings = settingsBuilder.build(); - final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings) - .node(); - return node.client(); - } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java index 4ced2c8..b616eb2 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java @@ -19,104 +19,38 @@ package org.apache.metamodel.elasticsearch.nativeclient; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.metamodel.data.AbstractDataSet; import org.apache.metamodel.data.DataSet; -import org.apache.metamodel.data.Row; +import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet; import org.apache.metamodel.query.SelectItem; +import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.search.SearchHit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link DataSet} implementation for ElasticSearch */ -final class ElasticSearchDataSet extends AbstractDataSet { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class); +final class ElasticSearchDataSet extends AbstractElasticSearchDataSet { private final Client _client; - private final AtomicBoolean _closed; - - private SearchResponse _searchResponse; - private SearchHit _currentHit; - private int _hitIndex = 0; - public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems, - boolean queryPostProcessed) { - super(selectItems); + public ElasticSearchDataSet(final Client client, final SearchResponse searchResponse, + final List<SelectItem> selectItems) { + super(searchResponse, selectItems); _client = client; - _searchResponse = searchResponse; - _closed = new AtomicBoolean(false); } - @Override - public void close() { - super.close(); - boolean closeNow = _closed.compareAndSet(true, false); - if (closeNow) { - ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client) - .addScrollId(_searchResponse.getScrollId()); - scrollRequestBuilder.execute(); - } + public void closeNow() { + ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client, + ClearScrollAction.INSTANCE).addScrollId(_searchResponse.getScrollId()); + scrollRequestBuilder.execute(); } @Override - protected void finalize() throws Throwable { - super.finalize(); - if (!_closed.get()) { - logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); - close(); - } - } - - @Override - public boolean next() { - final SearchHit[] hits = _searchResponse.getHits().hits(); - if (hits.length == 0) { - // break condition for the scroll - _currentHit = null; - return false; - } - - if (_hitIndex < hits.length) { - // pick the next hit within this search response - _currentHit = hits[_hitIndex]; - _hitIndex++; - return true; - } - - final String scrollId = _searchResponse.getScrollId(); - if (scrollId == null) { - // this search response is not scrolleable - then it's the end. - _currentHit = null; - return false; - } - - // try to scroll to the next set of hits - _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL) + protected SearchResponse scrollSearchResponse(final String scrollId) { + return _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL) .execute().actionGet(); - - // start over (recursively) - _hitIndex = 0; - return next(); - } - - @Override - public Row getRow() { - if (_currentHit == null) { - return null; - } - - final Map<String, Object> source = _currentHit.getSource(); - final String documentId = _currentHit.getId(); - final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader()); - return row; } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java index 0de2a71..2db8e8c 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.metamodel.elasticsearch.nativeclient; +import java.util.Iterator; import java.util.List; import org.apache.metamodel.MetaModelException; @@ -27,9 +28,11 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; import org.apache.metamodel.query.FilterItem; import org.apache.metamodel.query.LogicalOperator; import org.apache.metamodel.schema.Table; -import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,10 +60,6 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { final Client client = dataContext.getElasticSearchClient(); final String indexName = dataContext.getIndexName(); - final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); - deleteByQueryRequestBuilder.setIndices(indexName); - deleteByQueryRequestBuilder.setTypes(documentType); - final List<FilterItem> whereItems = getWhereItems(); // delete by query - note that creteQueryBuilderForSimpleWhere may @@ -74,9 +73,21 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " + whereItems); } - deleteByQueryRequestBuilder.setQuery(queryBuilder); - deleteByQueryRequestBuilder.execute().actionGet(); - logger.debug("Deleted documents by query."); + final SearchResponse response = + client.prepareSearch(indexName).setQuery(queryBuilder).setTypes(documentType).execute() + .actionGet(); + + client.admin().indices().prepareRefresh(indexName).execute().actionGet(); + final Iterator<SearchHit> iterator = response.getHits().iterator(); + while (iterator.hasNext()) { + final SearchHit hit = iterator.next(); + final String typeId = hit.getId(); + final DeleteResponse deleteResponse = + client.prepareDelete().setIndex(indexName).setType(documentType).setId(typeId).execute() + .actionGet(); + logger.debug("Deleted documents by query." + deleteResponse.getResult()); + } + client.admin().indices().prepareRefresh(indexName).execute().actionGet(); } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java deleted file mode 100644 index d66b240..0000000 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.nativeclient; - -import java.lang.reflect.Method; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.drop.AbstractTableDropBuilder; -import org.apache.metamodel.drop.TableDropBuilder; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.Table; -import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link TableDropBuilder} for dropping tables (document types) in an - * ElasticSearch index. - */ -final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class); - - private final ElasticSearchUpdateCallback _updateCallback; - - public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { - super(table); - _updateCallback = updateCallback; - } - - @Override - public void execute() throws MetaModelException { - final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); - final Table table = getTable(); - final String documentType = table.getName(); - logger.info("Deleting mapping / document type: {}", documentType); - final Client client = dataContext.getElasticSearchClient(); - final IndicesAdminClient indicesAdminClient = client.admin().indices(); - final String indexName = dataContext.getIndexName(); - - final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient) - .setIndices(indexName); - setType(requestBuilder, documentType); - - final DeleteMappingResponse result = requestBuilder.execute().actionGet(); - logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged()); - - final MutableSchema schema = (MutableSchema) table.getSchema(); - schema.removeTable(table); - } - - /** - * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method - * using reflection. This is done because the API of ElasticSearch was - * changed and the method signature differes between different versions. - * - * @param requestBuilder - * @param documentType - */ - private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) { - Object argument; - Method method; - try { - try { - method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class); - argument = new String[] {documentType}; - } catch (NoSuchMethodException e) { - logger.debug("No setType(String[]) method found, trying with a single String instead", e); - method = requestBuilder.getClass().getDeclaredMethod("setType", String.class); - argument = documentType; - } - } catch (Exception e) { - logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e); - throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e); - } - try { - method.setAccessible(true); - method.invoke(requestBuilder, argument); - } catch (Exception e) { - logger.error("Failed to invoke {}", method, e); - throw new IllegalStateException("Failed to invoke " + method, e); - } - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java index 70d31b4..a1c7f69 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java @@ -26,6 +26,7 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; import org.apache.metamodel.insert.AbstractRowInsertionBuilder; import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -46,7 +47,8 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast final Client client = dataContext.getElasticSearchClient(); final String indexName = dataContext.getIndexName(); final String documentType = getTable().getName(); - final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType); + final IndexRequestBuilder requestBuilder = + new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(indexName).setType(documentType); final Map<String, Object> valueMap = new HashMap<>(); final Column[] columns = getColumns(); @@ -68,11 +70,11 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast assert !valueMap.isEmpty(); requestBuilder.setSource(valueMap); - requestBuilder.setCreate(true); final IndexResponse result = requestBuilder.execute().actionGet(); logger.debug("Inserted document: id={}", result.getId()); - } + client.admin().indices().prepareRefresh(indexName).execute().actionGet(); + } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java deleted file mode 100644 index c0a1232..0000000 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.nativeclient; - -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; -import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.schema.ColumnType; - -/** - * Parser that transforms the ElasticSearch metadata response (json-like format) - * into an ElasticSearchMetaData object. - */ -public class ElasticSearchMetaDataParser { - - /** - * Parses the ElasticSearch meta data info into an ElasticSearchMetaData - * object. This method makes much easier to create the ElasticSearch schema. - * - * @param metaDataInfo - * ElasticSearch mapping metadata in Map format - * @return An ElasticSearchMetaData object - */ - public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) { - final String[] fieldNames = new String[metaDataInfo.size() + 1]; - final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1]; - - // add the document ID field (fixed) - fieldNames[0] = ElasticSearchUtils.FIELD_ID; - columnTypes[0] = ColumnType.STRING; - - int i = 1; - for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) { - @SuppressWarnings("unchecked") - final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue(); - - fieldNames[i] = metaDataField.getKey(); - columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata); - i++; - - } - return new ElasticSearchMetaData(fieldNames, columnTypes); - } - - private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) { - final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata); - - if (metaDataFieldType == null) { - return ColumnType.STRING; - } - - return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType); - } - - private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) { - final Object type = metaDataField.get("type"); - if (type == null) { - return null; - } - return type.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java new file mode 100644 index 0000000..eeee6fc --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.AbstractRowUpdationBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticSearchUpdateBuilder extends AbstractRowUpdationBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUpdateBuilder.class); + + private final ElasticSearchUpdateCallback _updateCallback; + + public ElasticSearchUpdateBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + + final Table table = getTable(); + final String documentType = table.getName(); + + final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + final List<FilterItem> whereItems = getWhereItems(); + + // delete by query - note that creteQueryBuilderForSimpleWhere may + // return matchAllQuery() if no where items are present. + final QueryBuilder queryBuilder = + ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND); + if (queryBuilder == null) { + // TODO: The where items could not be pushed down to a query. We + // could solve this by running a query first, gather all + // document IDs and then delete by IDs. + throw new UnsupportedOperationException( + "Could not push down WHERE items to delete by query request: " + whereItems); + } + + final SearchResponse response = client.prepareSearch(indexName).setQuery(queryBuilder).execute().actionGet(); + + final Iterator<SearchHit> iterator = response.getHits().iterator(); + while (iterator.hasNext()) { + final SearchHit hit = iterator.next(); + final String typeId = hit.getId(); + + final UpdateRequestBuilder requestBuilder = + new UpdateRequestBuilder(client, UpdateAction.INSTANCE).setIndex(indexName).setType(documentType) + .setId(typeId); + + final Map<String, Object> valueMap = new HashMap<>(); + final Column[] columns = getColumns(); + final Object[] values = getValues(); + for (int i = 0; i < columns.length; i++) { + if (isSet(columns[i])) { + final String name = columns[i].getName(); + final Object value = values[i]; + if (ElasticSearchUtils.FIELD_ID.equals(name)) { + if (value != null) { + requestBuilder.setId(value.toString()); + } + } else { + valueMap.put(name, value); + } + } + } + + assert !valueMap.isEmpty(); + + requestBuilder.setDoc(valueMap); + + final UpdateResponse updateResponse = requestBuilder.execute().actionGet(); + + logger.debug("Update document: id={}", updateResponse.getId()); + + client.admin().indices().prepareRefresh(indexName).get(); + } + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java index b81c9c7..c4cbbac 100644 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java @@ -26,6 +26,7 @@ import org.apache.metamodel.drop.TableDropBuilder; import org.apache.metamodel.insert.RowInsertionBuilder; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.RowUpdationBuilder; import org.elasticsearch.client.Client; /** @@ -50,13 +51,13 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback { @Override public boolean isDropTableSupported() { - return true; + return false; } @Override public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { - return new ElasticSearchDropTableBuilder(this, table); + throw new UnsupportedOperationException(); } @Override @@ -76,6 +77,11 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback { return new ElasticSearchDeleteBuilder(this, table); } + @Override + public RowUpdationBuilder update(final Table table) { + return new ElasticSearchUpdateBuilder(this, table); + } + public void onExecuteUpdateFinished() { // force refresh of the index final ElasticSearchDataContext dataContext = getDataContext(); http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java deleted file mode 100644 index 822ef1b..0000000 --- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.nativeclient; - -import java.util.Date; -import java.util.Map; - -import org.apache.metamodel.data.DataSetHeader; -import org.apache.metamodel.data.DefaultRow; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter; -import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.ColumnType; - -/** - * Shared/common util functions for the ElasticSearch MetaModel module. - */ -final class NativeElasticSearchUtils { - - public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) { - final Object[] values = new Object[header.size()]; - for (int i = 0; i < values.length; i++) { - final SelectItem selectItem = header.getSelectItem(i); - final Column column = selectItem.getColumn(); - - assert column != null; - assert selectItem.getAggregateFunction() == null; - assert selectItem.getScalarFunction() == null; - - if (column.isPrimaryKey()) { - values[i] = documentId; - } else { - Object value = sourceMap.get(column.getName()); - - if (column.getType() == ColumnType.DATE) { - Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value); - if (valueToDate == null) { - values[i] = value; - } else { - values[i] = valueToDate; - } - } else { - values[i] = value; - } - } - } - - return new DefaultRow(header, values); - } -}
