Added support for DELETE and thereby also simplified UPDATE statements on ElasticSearch. Fixes METAMODEL-79.
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/830d1c76 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/830d1c76 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/830d1c76 Branch: refs/heads/master Commit: 830d1c76530cb4129061fc6bb5335c3834c29c64 Parents: a93bd9c Author: Kasper Sørensen <[email protected]> Authored: Sun Mar 8 16:05:10 2015 +0100 Committer: Kasper Sørensen <[email protected]> Committed: Sun Mar 8 16:05:10 2015 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticSearchDataContext.java | 124 ++++++++++++++- .../elasticsearch/ElasticSearchDataSet.java | 10 ++ .../ElasticSearchDeleteBuilder.java | 96 ++++++++++++ .../ElasticSearchInsertBuilder.java | 76 ++++++++++ .../ElasticSearchInsertIntoBuilder.java | 76 ---------- .../ElasticSearchUpdateCallback.java | 22 +-- .../ElasticSearchDataContextTest.java | 150 ++++++++++++++++++- 7 files changed, 447 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java index d465144..c2d7f1b 100644 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java @@ -36,6 +36,8 @@ import org.apache.metamodel.data.DataSetHeader; import org.apache.metamodel.data.Row; import org.apache.metamodel.data.SimpleDataSetHeader; import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.query.OperatorType; import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.MutableColumn; @@ -43,6 +45,7 @@ import org.apache.metamodel.schema.MutableSchema; import org.apache.metamodel.schema.MutableTable; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; +import org.apache.metamodel.util.CollectionUtils; import org.apache.metamodel.util.SimpleTableDef; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; @@ -58,6 +61,8 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.hppc.ObjectLookupContainer; import org.elasticsearch.common.hppc.cursors.ObjectCursor; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,17 +255,126 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem } @Override + protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, + List<FilterItem> whereItems, int firstRow, int maxRows) { + final QueryBuilder queryBuilder = createQueryBuilderForSimpleWhere(table, whereItems, LogicalOperator.AND); + if (queryBuilder != null) { + // where clause can be pushed down to an ElasticSearch query + final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder); + final SearchResponse response = searchRequest.execute().actionGet(); + return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false); + } + return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); + } + + @Override protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) { + final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null); + final SearchResponse response = searchRequest.execute().actionGet(); + return new ElasticSearchDataSet(elasticSearchClient, response, columns, false); + } + + private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) { final String documentType = table.getName(); - final SearchRequestBuilder requestBuilder = elasticSearchClient.prepareSearch(indexName).setTypes(documentType); + final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType); + if (firstRow > 1) { + final int zeroBasedFrom = firstRow - 1; + searchRequest.setFrom(zeroBasedFrom); + } if (limitMaxRowsIsSet(maxRows)) { - requestBuilder.setSize(maxRows); + searchRequest.setSize(maxRows); } else { - requestBuilder.setScroll(TIMEOUT_SCROLL); + searchRequest.setScroll(TIMEOUT_SCROLL); } - final SearchResponse response = requestBuilder.execute().actionGet(); - return new ElasticSearchDataSet(elasticSearchClient, response, columns, false); + if (queryBuilder != null) { + searchRequest.setQuery(queryBuilder); + } + + return searchRequest; + } + + /** + * Creates, if possible, a {@link QueryBuilder} object which can be used to + * push down one or more {@link FilterItem}s to ElasticSearch's backend. + * + * @param table + * @param whereItems + * @param logicalOperator + * @return a {@link QueryBuilder} if one was produced, or null if the items + * could not be pushed down to an ElasticSearch query + */ + protected QueryBuilder createQueryBuilderForSimpleWhere(Table table, List<FilterItem> whereItems, + LogicalOperator logicalOperator) { + if (whereItems.isEmpty()) { + return QueryBuilders.matchAllQuery(); + } + + List<QueryBuilder> children = new ArrayList<QueryBuilder>(whereItems.size()); + for (FilterItem item : whereItems) { + final QueryBuilder itemQueryBuilder; + + if (item.isCompoundFilter()) { + final List<FilterItem> childItems = Arrays.asList(item.getChildItems()); + itemQueryBuilder = createQueryBuilderForSimpleWhere(table, childItems, item.getLogicalOperator()); + if (itemQueryBuilder == null) { + // something was not supported, so we have to forfeit here + // too. + return null; + } + } else { + final Column column = item.getSelectItem().getColumn(); + if (column == null) { + // unsupport type of where item - must have a column + // reference + return null; + } + final String fieldName = column.getName(); + final Object operand = item.getOperand(); + final OperatorType operator = item.getOperator(); + + switch (operator) { + case EQUALS_TO: + itemQueryBuilder = QueryBuilders.termQuery(fieldName, operand); + break; + case DIFFERENT_FROM: + itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName, operand)); + break; + case IN: + final List<?> operands = CollectionUtils.toList(operand); + itemQueryBuilder = QueryBuilders.termsQuery(fieldName, operands); + break; + case LIKE: + case GREATER_THAN_OR_EQUAL: + case GREATER_THAN: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + default: + // not (yet) support operator types + return null; + } + } + + children.add(itemQueryBuilder); + } + + // just one where item - just return the child query builder + if (children.size() == 1) { + return children.get(0); + } + + // build a bool query + final BoolQueryBuilder result = QueryBuilders.boolQuery(); + for (QueryBuilder child : children) { + switch (logicalOperator) { + case AND: + result.must(child); + case OR: + result.should(child); + } + } + + return result; } @Override http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java index e4f1054..8c41524 100644 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java @@ -18,12 +18,14 @@ */ package org.apache.metamodel.elasticsearch; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.metamodel.data.AbstractDataSet; import org.apache.metamodel.data.DataSet; import org.apache.metamodel.data.Row; +import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; import org.elasticsearch.action.search.ClearScrollRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -46,6 +48,14 @@ final class ElasticSearchDataSet extends AbstractDataSet { private SearchHit _currentHit; private int _hitIndex = 0; + public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems, + boolean queryPostProcessed) { + super(selectItems); + _client = client; + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + public ElasticSearchDataSet(Client client, SearchResponse searchResponse, Column[] columns, boolean queryPostProcessed) { super(columns); http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java new file mode 100644 index 0000000..05d1f15 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch; + +import java.util.List; +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.delete.AbstractRowDeletionBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; +import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; +import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link RowDeletionBuilder} implementation for + * {@link ElasticSearchDataContext}. + */ +final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDeleteBuilder.class); + + private final ElasticSearchUpdateCallback _updateCallback; + + public ElasticSearchDeleteBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + final Table table = getTable(); + final String documentType = table.getName(); + + final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + + final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); + deleteByQueryRequestBuilder.setIndices(indexName); + deleteByQueryRequestBuilder.setTypes(documentType); + + final List<FilterItem> whereItems = getWhereItems(); + if (whereItems.isEmpty()) { + // truncate the index + deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); + } else { + // delete by query + final QueryBuilder queryBuilder = dataContext.createQueryBuilderForSimpleWhere(table, whereItems, + LogicalOperator.AND); + if (queryBuilder == null) { + // TODO: The where items could not be pushed down to a query. We + // could solve this by running a query first, gather all + // document IDs and then delete by IDs. + throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " + + whereItems); + } + deleteByQueryRequestBuilder.setQuery(queryBuilder); + } + + final DeleteByQueryResponse response = deleteByQueryRequestBuilder.execute().actionGet(); + + if (logger.isDebugEnabled()) { + final Map<String, Object> headers = response.getHeaders(); + final IndexDeleteByQueryResponse indexResponse = response.getIndex(indexName); + final Map<String, Object> indexHeaders = indexResponse.getHeaders(); + + logger.debug("Deleted documents by query. Response headers: {}, Index headers: {}", headers, indexHeaders); + } + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertBuilder.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertBuilder.java new file mode 100644 index 0000000..d4302a7 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchUpdateCallback> { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchInsertBuilder.class); + + public ElasticSearchInsertBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(updateCallback, table); + } + + @Override + public void execute() throws MetaModelException { + final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + final String documentType = getTable().getName(); + final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType); + + final Map<String, Object> valueMap = new HashMap<String, Object>(); + final Column[] columns = getColumns(); + final Object[] values = getValues(); + for (int i = 0; i < columns.length; i++) { + if (isSet(columns[i])) { + final String name = columns[i].getName(); + final Object value = values[i]; + if (ElasticSearchDataContext.FIELD_ID.equals(name)) { + if (value != null) { + requestBuilder.setId(value.toString()); + } + } else { + valueMap.put(name, value); + } + } + } + + assert !valueMap.isEmpty(); + + requestBuilder.setSource(valueMap); + requestBuilder.setCreate(true); + + final IndexResponse result = requestBuilder.execute().actionGet(); + logger.debug("Inserted document: id={}, created={}", result.getId(), result.isCreated()); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertIntoBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertIntoBuilder.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertIntoBuilder.java deleted file mode 100644 index 754624f..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchInsertIntoBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.insert.AbstractRowInsertionBuilder; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.Table; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class ElasticSearchInsertIntoBuilder extends AbstractRowInsertionBuilder<ElasticSearchUpdateCallback> { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchInsertIntoBuilder.class); - - public ElasticSearchInsertIntoBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { - super(updateCallback, table); - } - - @Override - public void execute() throws MetaModelException { - final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext(); - final Client client = dataContext.getElasticSearchClient(); - final String indexName = dataContext.getIndexName(); - final String documentType = getTable().getName(); - final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType); - - final Map<String, Object> valueMap = new HashMap<String, Object>(); - final Column[] columns = getColumns(); - final Object[] values = getValues(); - for (int i = 0; i < columns.length; i++) { - if (isSet(columns[i])) { - final String name = columns[i].getName(); - final Object value = values[i]; - if (ElasticSearchDataContext.FIELD_ID.equals(name)) { - if (value != null) { - requestBuilder.setId(value.toString()); - } - } else { - valueMap.put(name, value); - } - } - } - - assert !valueMap.isEmpty(); - - requestBuilder.setSource(valueMap); - requestBuilder.setCreate(true); - - final IndexResponse result = requestBuilder.execute().actionGet(); - logger.debug("Inserted document: id={}, created={}", result.getId(), result.isCreated()); - } - -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchUpdateCallback.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchUpdateCallback.java index e162cc6..a3c6629 100644 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchUpdateCallback.java +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchUpdateCallback.java @@ -26,7 +26,6 @@ import org.apache.metamodel.drop.TableDropBuilder; import org.apache.metamodel.insert.RowInsertionBuilder; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; -import org.apache.metamodel.update.RowUpdationBuilder; import org.elasticsearch.client.Client; /** @@ -63,33 +62,18 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback { @Override public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { - return new ElasticSearchInsertIntoBuilder(this, table); - } - - @Override - public boolean isUpdateSupported() { - // TODO - return false; - } - - @Override - public RowUpdationBuilder update(Table table) throws IllegalArgumentException, IllegalStateException, - UnsupportedOperationException { - // TODO - throw new UnsupportedOperationException(); + return new ElasticSearchInsertBuilder(this, table); } @Override public boolean isDeleteSupported() { - // TODO - return false; + return true; } @Override public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { - // TODO - throw new UnsupportedOperationException(); + return new ElasticSearchDeleteBuilder(this, table); } public void onExecuteUpdateFinished() { http://git-wip-us.apache.org/repos/asf/metamodel/blob/830d1c76/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java index 6544686..0c48391 100644 --- a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java +++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java @@ -19,10 +19,7 @@ package org.apache.metamodel.elasticsearch; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.util.Arrays; import java.util.Date; @@ -32,14 +29,16 @@ import java.util.Map; import javax.swing.table.TableModel; +import org.apache.metamodel.MetaModelHelper; import org.apache.metamodel.UpdateCallback; import org.apache.metamodel.UpdateScript; import org.apache.metamodel.UpdateableDataContext; import org.apache.metamodel.create.CreateTable; import org.apache.metamodel.data.DataSet; import org.apache.metamodel.data.DataSetTableModel; -import org.apache.metamodel.data.FilteredDataSet; import org.apache.metamodel.data.InMemoryDataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.delete.DeleteFrom; import org.apache.metamodel.drop.DropTable; import org.apache.metamodel.elasticsearch.utils.EmbeddedElasticsearchServer; import org.apache.metamodel.query.FunctionType; @@ -49,6 +48,7 @@ import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.ColumnType; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.Update; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -206,7 +206,8 @@ public class ElasticSearchDataContextTest { final Column fooColumn = table.getColumnByName("foo"); final Column idColumn = table.getPrimaryKeys()[0]; - assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]", idColumn.toString()); + assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]", + idColumn.toString()); dataContext.executeUpdate(new UpdateScript() { @Override @@ -233,6 +234,126 @@ public class ElasticSearchDataContextTest { } @Test + public void testDeleteAll() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() + .toQuery()); + assertEquals("Row[values=[0]]", row.toString()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteByQuery() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, + dataContext.query().from(table).select("foo", "bar").toQuery()); + assertEquals("Row[values=[world, 43]]", row.toString()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteUnsupportedQueryType() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + // greater than is not yet supported + try { + dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); + fail("Exception expected"); + } catch (UnsupportedOperationException e) { + assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", + e.getMessage()); + } + + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test + public void testUpdateRow() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); + + DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); + assertTrue(dataSet.next()); + assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); + assertTrue(dataSet.next()); + assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); + assertFalse(dataSet.next()); + dataSet.close(); + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test public void testDropTable() throws Exception { Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); @@ -262,7 +383,22 @@ public class ElasticSearchDataContextTest { public void testWhereColumnEqualsValues() throws Exception { DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") .isEquals("user4").execute(); - assertEquals(FilteredDataSet.class, ds.getClass()); + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + try { + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } finally { + ds.close(); + } + } + + @Test + public void testWhereMultiColumnsEqualValues() throws Exception { + DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").and("message").ne(5).execute(); + assertEquals(ElasticSearchDataSet.class, ds.getClass()); try { assertTrue(ds.next());
