http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java index ec5ecba..983ba5e 100644 --- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java @@ -19,12 +19,6 @@ package org.apache.metamodel.elasticsearch.nativeclient; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; @@ -45,9 +39,7 @@ import org.apache.metamodel.data.DataSetTableModel; import org.apache.metamodel.data.InMemoryDataSet; import org.apache.metamodel.data.Row; import org.apache.metamodel.delete.DeleteFrom; -import org.apache.metamodel.drop.DropTable; import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer; import org.apache.metamodel.query.FunctionType; import org.apache.metamodel.query.Query; import org.apache.metamodel.query.SelectItem; @@ -58,20 +50,18 @@ import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; import org.apache.metamodel.update.Update; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -public class ElasticSearchDataContextTest { +public class ElasticSearchDataContextTest extends ESSingleNodeTestCase { private static final String indexName = "twitter"; private static final String indexType1 = "tweet1"; @@ -81,14 +71,15 @@ public class ElasticSearchDataContextTest { private static final String bulkIndexType = "bulktype"; private static final String peopleIndexType = "peopletype"; private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}"; - private static EmbeddedElasticsearchServer embeddedElasticsearchServer; - private static Client client; - private static UpdateableDataContext dataContext; - - @BeforeClass - public static void beforeTests() throws Exception { - embeddedElasticsearchServer = new EmbeddedElasticsearchServer(); - client = embeddedElasticsearchServer.getClient(); + private Client client; + private UpdateableDataContext dataContext; + + @Before + public void beforeTests() throws Exception { + client = client(); + + dataContext = new ElasticSearchDataContext(client, indexName); + indexTweeterDocument(indexType1, 1); indexTweeterDocument(indexType2, 1); indexTweeterDocument(indexType2, 2, null); @@ -96,15 +87,10 @@ public class ElasticSearchDataContextTest { indexTweeterDocument(indexType2, 1); indexBulkDocuments(indexName, bulkIndexType, 10); - // The refresh API allows to explicitly refresh one or more index, - // making all operations performed since the last refresh available for - // search - embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet(); - dataContext = new ElasticSearchDataContext(client, indexName); - System.out.println("Embedded ElasticSearch server created!"); + dataContext.refreshSchemas(); } - private static void insertPeopleDocuments() throws IOException { + private void insertPeopleDocuments() throws IOException { indexOnePeopleDocument("female", 20, 5); indexOnePeopleDocument("female", 17, 8); indexOnePeopleDocument("female", 18, 9); @@ -116,10 +102,9 @@ public class ElasticSearchDataContextTest { indexOnePeopleDocument("male", 18, 4); } - @AfterClass - public static void afterTests() { - embeddedElasticsearchServer.shutdown(); - System.out.println("Embedded ElasticSearch server shut down!"); + @After + public void afterTests() { + client.admin().indices().delete(new DeleteIndexRequest("_all")).actionGet(); } @Test @@ -128,9 +113,15 @@ public class ElasticSearchDataContextTest { Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray())); Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + try (DataSet ds = dataContext.query().from(indexType1).select("_id").execute()) { + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + assertTrue(ds.next()); + assertEquals("Row[values=[tweet_tweet1_1]]",ds.getRow().toString()); + } assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray())); + assertEquals(ColumnType.STRING, table.getColumnByName("_id").getType()); assertEquals(ColumnType.STRING, table.getColumnByName("user").getType()); assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType()); assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType()); @@ -201,7 +192,7 @@ public class ElasticSearchDataContextTest { } @Test - public void testCreateTableInsertQueryAndDrop() throws Exception { + public void testCreateTableAndInsertQuery() throws Exception { final Schema schema = dataContext.getDefaultSchema(); final CreateTable createTable = new CreateTable(schema, "testCreateTable"); createTable.withColumn("foo").ofType(ColumnType.STRING); @@ -235,42 +226,42 @@ public class ElasticSearchDataContextTest { assertNotNull(ds.getRow().getValue(idColumn)); assertFalse(ds.next()); } - - dataContext.executeUpdate(new DropTable(table)); - - dataContext.refreshSchemas(); - - assertNull(dataContext.getTableByQualifiedLabel(table.getName())); } @Test - public void testDetectOutsideChanges() throws Exception { - ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext; + public void testDeleteFromWithWhere() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final String tableName = "testCreateTableDelete"; + final CreateTable createTable = new CreateTable(schema, tableName); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.DOUBLE); + dataContext.executeUpdate(createTable); - // Create the type in ES - final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices(); - final String tableType = "outsideTable"; + final Table table = schema.getTableByName(tableName); - Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" }; + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); - new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties) - .execute().actionGet(); + dataContext.executeUpdate(new DeleteFrom(table).where("bar").eq(42)); - dataContext.refreshSchemas(); + final Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() + .toQuery()); - assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType)); + assertEquals("Row[values=[1]]", row.toString()); - new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet(); - dataContext.refreshSchemas(); - assertNull(dataContext.getTableByQualifiedLabel(tableType)); } @Test - public void testDeleteAll() throws Exception { + public void testDeleteNoWhere() throws Exception { final Schema schema = dataContext.getDefaultSchema(); final CreateTable createTable = new CreateTable(schema, "testCreateTable"); createTable.withColumn("foo").ofType(ColumnType.STRING); - createTable.withColumn("bar").ofType(ColumnType.NUMBER); + createTable.withColumn("bar").ofType(ColumnType.DOUBLE); dataContext.executeUpdate(createTable); final Table table = schema.getTableByName("testCreateTable"); @@ -288,8 +279,6 @@ public class ElasticSearchDataContextTest { Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() .toQuery()); assertEquals("Row[values=[0]]", row.toString()); - - dataContext.executeUpdate(new DropTable(table)); } @Test @@ -315,8 +304,6 @@ public class ElasticSearchDataContextTest { Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).select("foo", "bar").toQuery()); assertEquals("Row[values=[world, 43]]", row.toString()); - - dataContext.executeUpdate(new DropTable(table)); } @Test @@ -328,27 +315,22 @@ public class ElasticSearchDataContextTest { dataContext.executeUpdate(createTable); final Table table = schema.getTableByName("testCreateTable"); - try { - dataContext.executeUpdate(new UpdateScript() { - @Override - public void run(UpdateCallback callback) { - callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); - callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); - } - }); - - // greater than is not yet supported - try { - dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); - fail("Exception expected"); - } catch (UnsupportedOperationException e) { - assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", - e.getMessage()); + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); } + }); - } finally { - dataContext.executeUpdate(new DropTable(table)); + // greater than is not yet supported + try { + dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); + fail("Exception expected"); + } catch (UnsupportedOperationException e) { + assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", + e.getMessage()); } } @@ -361,54 +343,24 @@ public class ElasticSearchDataContextTest { dataContext.executeUpdate(createTable); final Table table = schema.getTableByName("testCreateTable"); - try { - dataContext.executeUpdate(new UpdateScript() { - @Override - public void run(UpdateCallback callback) { - callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); - callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); - } - }); - - dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); - - DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); - assertTrue(dataSet.next()); - assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); - assertTrue(dataSet.next()); - assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); - assertFalse(dataSet.next()); - dataSet.close(); - } finally { - dataContext.executeUpdate(new DropTable(table)); - } - } - - @Test - public void testDropTable() throws Exception { - Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); - // assert that the table was there to begin with - { - DataSet ds = dataContext.query().from(table).selectCount().execute(); - ds.next(); - assertEquals("Row[values=[9]]", ds.getRow().toString()); - ds.close(); - } + dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); - dataContext.executeUpdate(new DropTable(table)); - try { - DataSet ds = dataContext.query().from(table).selectCount().execute(); - ds.next(); - assertEquals("Row[values=[0]]", ds.getRow().toString()); - ds.close(); - } finally { - // restore the people documents for the next tests - insertPeopleDocuments(); - client.admin().indices().prepareRefresh().execute().actionGet(); - dataContext = new ElasticSearchDataContext(client, indexName); - } + DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); + assertTrue(dataSet.next()); + assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); + assertTrue(dataSet.next()); + assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); + assertFalse(dataSet.next()); + dataSet.close(); } @Test @@ -549,26 +501,19 @@ public class ElasticSearchDataContextTest { @Test public void testNonDynamicMapingTableNames() throws Exception { - createIndex(); + CreateIndexRequest cir = new CreateIndexRequest(indexName2); + client.admin().indices().create(cir).actionGet(); + + PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping, XContentType.JSON); + + client.admin().indices().putMapping(pmr).actionGet(); ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2); assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray())); } - private static void createIndex() { - CreateIndexRequest cir = new CreateIndexRequest(indexName2); - CreateIndexResponse response = client.admin().indices().create(cir).actionGet(); - - System.out.println("create index: " + response.isAcknowledged()); - - PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping); - - PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet(); - System.out.println("put mapping: " + response2.isAcknowledged()); - } - - private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) { + private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < numberOfDocuments; i++) { @@ -578,17 +523,17 @@ public class ElasticSearchDataContextTest { bulkRequest.execute().actionGet(); } - private static void indexTweeterDocument(String indexType, int id, Date date) { - client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date)) - .setId("tweet_" + indexType + "_" + id).execute().actionGet(); + private void indexTweeterDocument(String indexType, int id, Date date) { + final String id1 = "tweet_" + indexType + "_" + id; + client.prepareIndex(indexName, indexType, id1).setSource(buildTweeterJson(id, date)).execute().actionGet(); } - private static void indexTweeterDocument(String indexType, int id) { + private void indexTweeterDocument(String indexType, int id) { client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)) .setId("tweet_" + indexType + "_" + id).execute().actionGet(); } - private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException { + private void indexOnePeopleDocument(String gender, int age, int id) throws IOException { client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute() .actionGet(); }
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java index 8b5eb50..e08f715 100644 --- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java @@ -24,6 +24,7 @@ import java.util.Map; import junit.framework.TestCase; import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser; import org.apache.metamodel.schema.ColumnType; import org.elasticsearch.common.collect.MapBuilder; http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java deleted file mode 100644 index 9ffc6b8..0000000 --- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.nativeclient; - -import junit.framework.TestCase; -import org.apache.metamodel.data.DataSetHeader; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.data.SimpleDataSetHeader; -import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.ColumnType; -import org.apache.metamodel.schema.MutableColumn; - -import java.util.*; - -public class ElasticSearchUtilsTest extends TestCase { - - public void testAssignDocumentIdForPrimaryKeys() throws Exception { - MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true); - SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn); - List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem); - String documentId = "doc1"; - DataSetHeader header = new SimpleDataSetHeader(selectItems1); - Map<String, Object> values = new HashMap<>(); - values.put("value1", "theValue"); - Row row = NativeElasticSearchUtils.createRow(values, documentId, header); - String primaryKeyValue = (String) row.getValue(primaryKeyItem); - - assertEquals(primaryKeyValue, documentId); - } - - public void testCreateRowWithParsableDates() throws Exception { - SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING)); - SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE)); - List<SelectItem> selectItems1 = Arrays.asList(item1, item2); - String documentId = "doc1"; - DataSetHeader header = new SimpleDataSetHeader(selectItems1); - Map<String, Object> values = new HashMap<>(); - values.put("value1", "theValue"); - values.put("value2", "2013-01-04T15:55:51.217+01:00"); - Row row = NativeElasticSearchUtils.createRow(values, documentId, header); - Object stringValue = row.getValue(item1); - Object dateValue = row.getValue(item2); - - assertTrue(stringValue instanceof String); - assertTrue(dateValue instanceof Date); - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java deleted file mode 100644 index b94d0ab..0000000 --- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.nativeclient.utils; - -import org.apache.commons.io.FileUtils; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.node.Node; - -import java.io.File; -import java.io.IOException; - -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - -public class EmbeddedElasticsearchServer { - - private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data"; - - private final Node node; - private final String dataDirectory; - - public EmbeddedElasticsearchServer() { - this(DEFAULT_DATA_DIRECTORY); - } - - public EmbeddedElasticsearchServer(String dataDirectory) { - this.dataDirectory = dataDirectory; - - ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder() - .put("http.enabled", "true") - .put("path.data", dataDirectory); - - node = nodeBuilder() - .local(true) - .settings(elasticsearchSettings.build()) - .node(); - } - - public Client getClient() { - return node.client(); - } - - public void shutdown() { - node.close(); - deleteDataDirectory(); - } - - private void deleteDataDirectory() { - try { - FileUtils.deleteDirectory(new File(dataDirectory)); - } catch (IOException e) { - throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index 9a3b1d8..46f930c 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -21,7 +21,7 @@ <name>MetaModel module for Elasticsearch</name> <properties> - <elasticsearch.version>1.4.4</elasticsearch.version> + <elasticsearch.version>5.6.3</elasticsearch.version> </properties> <modules> http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml index 936cf4c..8f556e6 100644 --- a/elasticsearch/rest/pom.xml +++ b/elasticsearch/rest/pom.xml @@ -26,11 +26,6 @@ under the License. <modelVersion>4.0.0</modelVersion> - <properties> - <jest.version>2.0.2</jest.version> - <elasticsearch.version>1.4.4</elasticsearch.version> - </properties> - <artifactId>MetaModel-elasticsearch-rest</artifactId> <name>MetaModel module for ElasticSearch via REST client</name> @@ -52,34 +47,28 @@ under the License. <artifactId>commons-io</artifactId> </dependency> - <!-- Jest --> - <dependency> - <groupId>io.searchbox</groupId> - <artifactId>jest</artifactId> - <version>${jest.version}</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </dependency> <!-- elasticsearch --> <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> </dependency> <!-- test --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> + <dependency><!-- required by elasticsearch --> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.9.1</version> <scope>test</scope> </dependency> <dependency> @@ -87,5 +76,84 @@ under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> + <profiles> + <profile> + <id>integration-test</id> + <build> + <plugins> + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.19.1</version> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + <version>${docker-maven-plugin.version}</version> + <configuration> + <logDate>default</logDate> + <autoPull>true</autoPull> + <images> + <image> + <name>elasticsearch-metamodel</name> + <build> + <dockerFileDir>${project.build.directory}/test-classes</dockerFileDir> + </build> + <run> + <ports> + <port>9200:9200</port> + <port>9300:9300</port> + </ports> + <env> + <ES_JAVA_OPTS>-Xms1g -Xmx1g</ES_JAVA_OPTS> + <cluster.name>docker-cluster</cluster.name> + <bootstrap.memory_lock>true</bootstrap.memory_lock> + <xpack.security.enabled>false</xpack.security.enabled> + </env> + <wait> + <url>http://${docker.host.address}:9200</url> + <time>300000</time> + </wait> + </run> + </image> + </images> + </configuration> + <executions> + <execution> + <id>start</id> + <phase>pre-integration-test</phase> + <goals> + <goal>build</goal> + <goal>start</goal> + </goals> + </execution> + <execution> + <id>stop</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java new file mode 100644 index 0000000..ddd7e17 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import static java.util.Collections.emptySet; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.http.Header; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticSearchRestClient extends RestHighLevelClient { + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class); + + public ElasticSearchRestClient(final RestClient restClient) { + super(restClient); + } + + public final boolean refresh(final String indexName, final Header... headers) { + try { + return performRequest(new MainRequest(), request -> refresh(indexName), + ElasticSearchRestClient::convertResponse, emptySet(), headers); + } catch (IOException e) { + logger.info("Failed to refresh index \"{}\"", indexName, e); + } + return false; + } + + private static Request refresh(final String indexName) { + return new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_refresh", Collections.emptyMap(), null); + } + + public final boolean delete(final String indexName, final Header... headers) throws IOException { + return performRequest(new MainRequest(), request -> delete(indexName), + ElasticSearchRestClient::convertResponse, emptySet(), headers); + } + + private static Request delete(final String indexName) { + return new Request(HttpDelete.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null); + } + + public Set<Entry<String, Object>> getMappings(final String indexName, final Header... headers) throws IOException { + return performRequestAndParseEntity(new GetIndexRequest(), request -> getMappings(indexName), ( + response) -> parseMappings(response, indexName), emptySet(), headers); + } + + private static Request getMappings(final String indexName) { + return new Request(HttpGet.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null); + } + + public final boolean createMapping(final PutMappingRequest putMappingRequest, final Header... headers) + throws IOException { + return performRequest(putMappingRequest, request -> putMapping(putMappingRequest), + ElasticSearchRestClient::convertResponse, emptySet(), headers); + } + + private static Request putMapping(final PutMappingRequest putMappingRequest) { + final String endpoint = "/" + putMappingRequest.indices()[0] + "/_mapping/" + putMappingRequest.type(); + final ByteArrayEntity entity = new ByteArrayEntity(putMappingRequest.source().getBytes(), + ContentType.APPLICATION_JSON); + return new Request(HttpPut.METHOD_NAME, endpoint, Collections.emptyMap(), entity); + } + + // Carbon copy of RestHighLevelClient#convertExistsResponse(Response) method, which is unaccessible from this class. + private static boolean convertResponse(final Response response) { + return response.getStatusLine().getStatusCode() == 200; + } + + @SuppressWarnings("unchecked") + static Set<Entry<String, Object>> parseMappings(final XContentParser response, final String indexName) throws IOException { + Map<String, Object> schema = (Map<String, Object>) response.map().get(indexName); + Map<String, Object> tables = (Map<String, Object>) schema.get("mappings"); + + return tables.entrySet(); + } + + ActionResponse execute(final ActionRequest request) throws IOException { + if (request instanceof BulkRequest) { + return bulk((BulkRequest) request); + } else if (request instanceof IndexRequest) { + return index((IndexRequest) request); + } else if (request instanceof DeleteRequest) { + return delete((DeleteRequest) request); + } else if (request instanceof ClearScrollRequest) { + return clearScroll((ClearScrollRequest) request); + } else if (request instanceof SearchScrollRequest) { + return searchScroll((SearchScrollRequest) request); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java new file mode 100644 index 0000000..91842f5 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.create.AbstractTableCreationBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; + +final class ElasticSearchRestCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchRestUpdateCallback> { + + public ElasticSearchRestCreateTableBuilder(final ElasticSearchRestUpdateCallback updateCallback, + final Schema schema, final String name) { + super(updateCallback, schema, name); + } + + @Override + public Table execute() throws MetaModelException { + final MutableTable table = getTable(); + final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table); + + final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext(); + final String indexName = dataContext.getIndexName(); + + final PutMappingRequest putMapping = new PutMappingRequest(indexName).type(table.getName()).source(source); + getUpdateCallback().execute(putMapping); + + final MutableSchema schema = (MutableSchema) getSchema(); + schema.addTable(table); + return table; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java index c5a5696..5b32d14 100644 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java @@ -18,57 +18,43 @@ */ package org.apache.metamodel.elasticsearch.rest; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import org.apache.metamodel.BatchUpdateScript; import org.apache.metamodel.DataContext; import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.QueryPostprocessDataContext; import org.apache.metamodel.UpdateScript; import org.apache.metamodel.UpdateSummary; -import org.apache.metamodel.UpdateableDataContext; import org.apache.metamodel.data.DataSet; import org.apache.metamodel.data.DataSetHeader; import org.apache.metamodel.data.Row; import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext; import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser; import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; import org.apache.metamodel.query.FilterItem; import org.apache.metamodel.query.LogicalOperator; import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.MutableColumn; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.MutableTable; -import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; import org.apache.metamodel.util.SimpleTableDef; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - -import io.searchbox.client.JestClient; -import io.searchbox.client.JestResult; -import io.searchbox.core.Count; -import io.searchbox.core.CountResult; -import io.searchbox.core.Get; -import io.searchbox.core.Search; -import io.searchbox.core.SearchResult; -import io.searchbox.indices.mapping.GetMapping; -import io.searchbox.params.Parameters; - /** * DataContext implementation for ElasticSearch analytics engine. * @@ -86,28 +72,14 @@ import io.searchbox.params.Parameters; * This implementation supports either automatic discovery of a schema or manual * specification of a schema, through the {@link SimpleTableDef} class. */ -public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext, - UpdateableDataContext { +public class ElasticSearchRestDataContext extends AbstractElasticSearchDataContext { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class); - public static final String FIELD_ID = "_id"; - - // 1 minute timeout - public static final String TIMEOUT_SCROLL = "1m"; - // we scroll when more than 400 rows are expected private static final int SCROLL_THRESHOLD = 400; - private final JestClient elasticSearchClient; - - private final String indexName; - // Table definitions that are set from the beginning, not supposed to be - // changed. - private final List<SimpleTableDef> staticTableDefinitions; - - // Table definitions that are discovered, these can change - private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); + private final ElasticSearchRestClient elasticSearchClient; /** * Constructs a {@link ElasticSearchRestDataContext}. This constructor @@ -122,18 +94,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im * an array of {@link SimpleTableDef}s, which define the table * and column model of the ElasticSearch index. */ - public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) { - super(false); + public ElasticSearchRestDataContext(final ElasticSearchRestClient client, final String indexName, + final SimpleTableDef... tableDefinitions) { + super(indexName, tableDefinitions); + if (client == null) { throw new IllegalArgumentException("ElasticSearch Client cannot be null"); } - if (indexName == null || indexName.trim().length() == 0) { - throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); - } this.elasticSearchClient = client; - this.indexName = indexName; - this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections - .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions)); this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); } @@ -147,65 +115,51 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im * @param indexName * the name of the ElasticSearch index to represent */ - public ElasticSearchRestDataContext(JestClient client, String indexName) { + public ElasticSearchRestDataContext(final ElasticSearchRestClient client, String indexName) { this(client, indexName, new SimpleTableDef[0]); } - /** - * Performs an analysis of the available indexes in an ElasticSearch cluster - * {@link JestClient} instance and detects the elasticsearch types structure - * based on the metadata provided by the ElasticSearch java client. - * - * @see {@link #detectTable(JsonObject, String)} - * @return a mutable schema instance, useful for further fine tuning by the - * user. - */ - private SimpleTableDef[] detectSchema() { + @Override + protected SimpleTableDef[] detectSchema() { logger.info("Detecting schema for index '{}'", indexName); - final JestResult jestResult; + final Set<Entry<String, Object>> mappings; try { - final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build(); - jestResult = elasticSearchClient.execute(getMapping); - } catch (Exception e) { + mappings = getElasticSearchClient().getMappings(indexName); + } catch (IOException e) { logger.error("Failed to retrieve mappings", e); throw new MetaModelException("Failed to execute request for index information needed to detect schema", e); } - if (!jestResult.isSucceeded()) { - logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage()); - throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage()); - } - final List<SimpleTableDef> result = new ArrayList<>(); - final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName) - .getAsJsonObject("mappings").entrySet(); - if (mappings.size() == 0) { + if (mappings.isEmpty()) { logger.warn("No metadata returned for index name '{}' - no tables will be detected."); } else { + for (Entry<String, Object> mapping : mappings) { + final String documentType = mapping.getKey(); - for (Map.Entry<String, JsonElement> entry : mappings) { - final String documentType = entry.getKey(); + @SuppressWarnings("unchecked") + Map<String, Object> mappingConfiguration = (Map<String, Object>) mapping.getValue(); + @SuppressWarnings("unchecked") + Map<String, Object> properties = (Map<String, Object>) mappingConfiguration.get("properties"); try { - final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties") - .getAsJsonObject(), documentType); + final SimpleTableDef table = detectTable(properties, documentType); result.add(table); } catch (Exception e) { logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); } } } - final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); - Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() { - @Override - public int compare(SimpleTableDef o1, SimpleTableDef o2) { - return o1.getName().compareTo(o2.getName()); - } - }); + return sortTables(result); + } + + @Override + protected void onSchemaCacheRefreshed() { + getElasticSearchClient().refresh(indexName); - return tableDefArray; + detectSchema(); } /** @@ -219,60 +173,22 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im * the name of the index type * @return a table definition for ElasticSearch. */ - private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) { - final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties); + private static SimpleTableDef detectTable(final Map<String, Object> metadataProperties, final String documentType) { + final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataProperties); return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes()); } @Override - protected Schema getMainSchema() throws MetaModelException { - final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); - for (final SimpleTableDef tableDef : staticTableDefinitions) { - addTable(theSchema, tableDef); - } - - final SimpleTableDef[] tables = detectSchema(); - synchronized (this) { - dynamicTableDefinitions.clear(); - dynamicTableDefinitions.addAll(Arrays.asList(tables)); - for (final SimpleTableDef tableDef : dynamicTableDefinitions) { - final List<String> tableNames = theSchema.getTableNames(); - - if (!tableNames.contains(tableDef.getName())) { - addTable(theSchema, tableDef); - } - } - } - - return theSchema; - } - - private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { - final MutableTable table = tableDef.toTable().setSchema(theSchema); - final Column idColumn = table.getColumnByName(FIELD_ID); - if (idColumn != null && idColumn instanceof MutableColumn) { - final MutableColumn mutableColumn = (MutableColumn) idColumn; - mutableColumn.setPrimaryKey(true); - } - theSchema.addTable(table); - } - - @Override - protected String getMainSchemaName() throws MetaModelException { - return indexName; - } - - @Override - protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, - List<FilterItem> whereItems, int firstRow, int maxRows) { + protected DataSet materializeMainSchemaTable(final Table table, final List<SelectItem> selectItems, + final List<FilterItem> whereItems, final int firstRow, final int maxRows) { final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND); if (queryBuilder != null) { // where clause can be pushed down to an ElasticSearch query SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder); - SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows)); + SearchResponse result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows)); - return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems); + return new ElasticSearchRestDataSet(getElasticSearchClient(), result, selectItems); } return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); } @@ -282,30 +198,30 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD; } - private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) { - Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType( - table.getName()); + private SearchResponse executeSearch(final Table table, final SearchSourceBuilder searchSourceBuilder, + final boolean scroll) { + final SearchRequest searchRequest = new SearchRequest(new String[] { getIndexName() }, searchSourceBuilder) + .types(table.getName()); + if (scroll) { - builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL); + searchRequest.scroll(TIMEOUT_SCROLL); } - Search search = builder.build(); - SearchResult result; try { - result = elasticSearchClient.execute(search); - } catch (Exception e) { + return getElasticSearchClient().search(searchRequest); + } catch (IOException e) { logger.warn("Could not execute ElasticSearch query", e); throw new MetaModelException("Could not execute ElasticSearch query", e); } - return result; } @Override protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { - SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded( + SearchResponse searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded( maxRows)); - return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList())); + return new ElasticSearchRestDataSet(getElasticSearchClient(), searchResult, columns.stream() + .map(SelectItem::new).collect(Collectors.toList())); } private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) { @@ -317,7 +233,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im if (limitMaxRowsIsSet(maxRows)) { searchRequest.size(maxRows); } else { - searchRequest.size(Integer.MAX_VALUE); + searchRequest.size(SCROLL_THRESHOLD); } if (queryBuilder != null) { @@ -337,12 +253,16 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im final String documentType = table.getName(); final String id = keyValue.toString(); - final Get get = new Get.Builder(indexName, id).type(documentType).build(); - final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get); - final DataSetHeader header = new SimpleDataSetHeader(selectItems); - return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header); + try { + return ElasticSearchUtils.createRow(getElasticSearchClient() + .get(new GetRequest(getIndexName(), documentType, id)) + .getSource(), id, header); + } catch (IOException e) { + logger.warn("Could not execute ElasticSearch query", e); + throw new MetaModelException("Could not execute ElasticSearch query", e); + } } @Override @@ -352,30 +272,23 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im return null; } final String documentType = table.getName(); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.termQuery("_type", documentType)); + sourceBuilder.size(0); - Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build(); - - CountResult countResult; try { - countResult = elasticSearchClient.execute(count); + return getElasticSearchClient().search(new SearchRequest(new String[] { getIndexName() }, sourceBuilder)) + .getHits().getTotalHits(); } catch (Exception e) { logger.warn("Could not execute ElasticSearch get query", e); throw new MetaModelException("Could not execute ElasticSearch get query", e); } - - return countResult.getCount(); - } - - private boolean limitMaxRowsIsSet(int maxRows) { - return (maxRows != -1); } @Override public UpdateSummary executeUpdate(UpdateScript update) { final boolean isBatch = update instanceof BatchUpdateScript; - final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch); + final ElasticSearchRestUpdateCallback callback = new ElasticSearchRestUpdateCallback(this, isBatch); update.run(callback); callback.onExecuteUpdateFinished(); return callback.getUpdateSummary(); @@ -384,14 +297,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im /** * Gets the {@link JestClient} that this {@link DataContext} is wrapping. */ - public JestClient getElasticSearchClient() { + public ElasticSearchRestClient getElasticSearchClient() { return elasticSearchClient; } - - /** - * Gets the name of the index that this {@link DataContext} is working on. - */ - public String getIndexName() { - return indexName; - } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java index b2dc4c3..b1756b7 100644 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java @@ -18,6 +18,14 @@ */ package org.apache.metamodel.elasticsearch.rest; +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.metamodel.ConnectionException; import org.apache.metamodel.DataContext; import org.apache.metamodel.factory.DataContextFactory; @@ -25,10 +33,8 @@ import org.apache.metamodel.factory.DataContextProperties; import org.apache.metamodel.factory.ResourceFactoryRegistry; import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException; import org.apache.metamodel.util.SimpleTableDef; - -import io.searchbox.client.JestClient; -import io.searchbox.client.JestClientFactory; -import io.searchbox.client.config.HttpClientConfig; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; /** * Factory for ElasticSearch data context of REST type. @@ -72,18 +78,20 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory { return true; } - private JestClient createClient(DataContextProperties properties) { - final String serverUri = properties.getUrl(); - final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri); + private ElasticSearchRestClient createClient(final DataContextProperties properties) throws MalformedURLException { + final URL url = new URL(properties.getUrl()); + final RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort())); + if (properties.getUsername() != null) { - builder.defaultCredentials(properties.getUsername(), properties.getPassword()); + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(), + properties.getPassword())); + + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider( + credentialsProvider)); } - final JestClientFactory clientFactory = new JestClientFactory(); - final HttpClientConfig httpClientConfig = new HttpClientConfig(builder); - clientFactory.setHttpClientConfig(httpClientConfig); - final JestClient client = clientFactory.getObject(); - return client; + return new ElasticSearchRestClient(builder.build()); } private String getIndex(DataContextProperties properties) { @@ -97,10 +105,14 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory { @Override public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) throws UnsupportedDataContextPropertiesException, ConnectionException { - final JestClient client = createClient(properties); - final String indexName = getIndex(properties); - final SimpleTableDef[] tableDefinitions = properties.getTableDefs(); - return new ElasticSearchRestDataContext(client, indexName, tableDefinitions); + try { + ElasticSearchRestClient client = createClient(properties); + final String indexName = getIndex(properties); + final SimpleTableDef[] tableDefinitions = properties.getTableDefs(); + return new ElasticSearchRestDataContext(client, indexName, tableDefinitions); + } catch (MalformedURLException e) { + throw new UnsupportedDataContextPropertiesException(e); + } } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java new file mode 100644 index 0000000..d79b271 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.io.IOException; +import java.util.List; + +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext; +import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet; +import org.apache.metamodel.query.SelectItem; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link DataSet} implementation for ElasticSearch + */ +final class ElasticSearchRestDataSet extends AbstractElasticSearchDataSet { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataSet.class); + + private final ElasticSearchRestClient _client; + + public ElasticSearchRestDataSet(final ElasticSearchRestClient client, final SearchResponse searchResponse, final List<SelectItem> selectItems) { + super(searchResponse, selectItems); + _client = client; + } + + @Override + public void closeNow() { + final String scrollId = _searchResponse.getScrollId(); + final ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + try { + _client.execute(clearScrollRequest); + } catch (IOException e) { + logger.warn("Could not clear scroll.", e); + } + } + + @Override + protected SearchResponse scrollSearchResponse(final String scrollId) throws IOException { + return _client.searchScroll(new SearchScrollRequest(scrollId).scroll( + AbstractElasticSearchDataContext.TIMEOUT_SCROLL)); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java new file mode 100644 index 0000000..f8caa2d --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.delete.AbstractRowDeletionBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +/** + * {@link RowDeletionBuilder} implementation for + * {@link ElasticSearchRestDataContext}. + */ +final class ElasticSearchRestDeleteBuilder extends AbstractRowDeletionBuilder { + private final ElasticSearchRestUpdateCallback _updateCallback; + + public ElasticSearchRestDeleteBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + final Table table = getTable(); + final String documentType = table.getName(); + + final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext(); + final String indexName = dataContext.getIndexName(); + + final List<FilterItem> whereItems = getWhereItems(); + + // delete by query - note that creteQueryBuilderForSimpleWhere may + // return matchAllQuery() if no where items are present. + final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, + LogicalOperator.AND); + if (queryBuilder == null) { + // TODO: The where items could not be pushed down to a query. We + // could solve this by running a query first, gather all + // document IDs and then delete by IDs. + throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " + + whereItems); + } + + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(documentType); + searchRequest.source(searchSourceBuilder); + + try { + final SearchResponse response = dataContext.getElasticSearchClient().search(searchRequest); + + final Iterator<SearchHit> iterator = response.getHits().iterator(); + while (iterator.hasNext()) { + final SearchHit hit = iterator.next(); + final String typeId = hit.getId(); + + DeleteRequest deleteRequest = new DeleteRequest(indexName, documentType, typeId); + + _updateCallback.execute(deleteRequest); + } + } catch (IOException e) { + throw new MetaModelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java new file mode 100644 index 0000000..0ba4f66 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.index.IndexRequest; + +final class ElasticSearchRestInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchRestUpdateCallback> { + + public ElasticSearchRestInsertBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) { + super(updateCallback, table); + } + + @Override + public void execute() throws MetaModelException { + final ElasticSearchRestUpdateCallback updateCallback = getUpdateCallback(); + final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext(); + final String indexName = dataContext.getIndexName(); + final String documentType = getTable().getName(); + + final Map<String, Object> source = new HashMap<>(); + final Column[] columns = getColumns(); + final Object[] values = getValues(); + String id = null; + for (int i = 0; i < columns.length; i++) { + if (isSet(columns[i])) { + final String columnName = columns[i].getName(); + + final Object value = values[i]; + if (ElasticSearchUtils.FIELD_ID.equals(columnName)) { + if (value != null) { + id = value.toString(); + } + } else { + final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName); + source.put(fieldName, value); + } + } + } + + assert !source.isEmpty(); + + IndexRequest indexRequest = new IndexRequest(indexName, documentType, id); + indexRequest.source(source); + + getUpdateCallback().execute(indexRequest); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java new file mode 100644 index 0000000..defd18f --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.io.IOException; + +import org.apache.metamodel.AbstractUpdateCallback; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.create.TableCreationBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.insert.RowInsertionBuilder; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link UpdateCallback} implementation for + * {@link ElasticSearchRestDataContext}. + */ +final class ElasticSearchRestUpdateCallback extends AbstractUpdateCallback { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestUpdateCallback.class); + + private static final int BULK_BUFFER_SIZE = 1000; + + private BulkRequest bulkRequest; + private int bulkActionCount = 0; + private final boolean isBatch; + + public ElasticSearchRestUpdateCallback(final ElasticSearchRestDataContext dataContext, final boolean isBatch) { + super(dataContext); + this.isBatch = isBatch; + } + + private boolean isBatch() { + return isBatch; + } + + @Override + public ElasticSearchRestDataContext getDataContext() { + return (ElasticSearchRestDataContext) super.getDataContext(); + } + + @Override + public TableCreationBuilder createTable(final Schema schema, final String name) throws IllegalArgumentException, + IllegalStateException { + return new ElasticSearchRestCreateTableBuilder(this, schema, name); + } + + @Override + public boolean isDropTableSupported() { + return false; + } + + @Override + public TableDropBuilder dropTable(final Table table) { + throw new UnsupportedOperationException(); + } + + @Override + public RowInsertionBuilder insertInto(final Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new ElasticSearchRestInsertBuilder(this, table); + } + + @Override + public boolean isDeleteSupported() { + return true; + } + + @Override + public RowDeletionBuilder deleteFrom(final Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new ElasticSearchRestDeleteBuilder(this, table); + } + + public void onExecuteUpdateFinished() { + if (isBatch()) { + flushBulkActions(); + } + + getDataContext().refreshSchemas(); + } + + private void flushBulkActions() { + if (bulkRequest == null || bulkActionCount == 0) { + // nothing to flush + return; + } + + logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName()); + executeBlocking(bulkRequest); + + bulkActionCount = 0; + bulkRequest = null; + } + + public void execute(final ActionRequest action) { + if (isBatch() && (action instanceof DocWriteRequest<?>)) { + getBulkRequest().add((DocWriteRequest<?>) action); + bulkActionCount++; + if (bulkActionCount == BULK_BUFFER_SIZE) { + flushBulkActions(); + } + } else { + executeBlocking(action); + } + } + + private void executeBlocking(final ActionRequest action) { + try { + if (action instanceof PutMappingRequest) { + getDataContext().getElasticSearchClient().createMapping((PutMappingRequest) action); + } else { + final ActionResponse result = getDataContext().getElasticSearchClient().execute(action); + + if (result instanceof BulkResponse && ((BulkResponse) result).hasFailures()) { + BulkItemResponse[] failedItems = ((BulkResponse) result).getItems(); + for (int i = 0; i < failedItems.length; i++) { + if (failedItems[i].isFailed()) { + final BulkItemResponse failedItem = failedItems[i]; + logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i + 1, + failedItems.length, failedItem.getId(), failedItem.getOpType(), failedItem.status(), + failedItem.getFailureMessage()); + } + } + } + } + } catch (IOException e) { + logger.warn("Could not execute command {} ", action, e); + throw new MetaModelException("Could not execute " + action, e); + } + } + + private BulkRequest getBulkRequest() { + if (bulkRequest == null) { + bulkRequest = new BulkRequest(); + } + return bulkRequest; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java deleted file mode 100644 index 1bb026d..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import io.searchbox.action.Action; -import io.searchbox.client.JestClient; -import io.searchbox.client.JestResult; -import org.apache.metamodel.MetaModelException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -final class JestClientExecutor { - private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class); - - static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) { - return execute(jestClient, clientRequest, true); - } - - static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) { - try { - final T result = jestClient.execute(clientRequest); - logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded()); - return result; - } catch (IOException e) { - logger.warn("Could not execute command {} ", clientRequest, e); - if (doThrow) { - throw new MetaModelException("Could not execute command " + clientRequest, e); - } - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java deleted file mode 100644 index cc42b07..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import io.searchbox.action.GenericResultAbstractAction; - -public class JestDeleteScroll extends GenericResultAbstractAction { - private JestDeleteScroll(Builder builder) { - super(builder); - this.payload = builder.getScrollId(); - setURI(buildURI()); - } - - @Override - public String getRestMethodName() { - return "DELETE"; - } - - @Override - protected String buildURI() { - return super.buildURI() + "/_search/scroll"; - } - - public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> { - private final String scrollId; - - public Builder(String scrollId) { - this.scrollId = scrollId; - } - - @Override - public JestDeleteScroll build() { - return new JestDeleteScroll(this); - } - - public String getScrollId() { - return scrollId; - } - } - -}
