http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java new file mode 100644 index 0000000..0ae170c --- /dev/null +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java @@ -0,0 +1,606 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.swing.table.TableModel; + +import org.apache.metamodel.MetaModelHelper; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.create.CreateTable; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.DataSetTableModel; +import org.apache.metamodel.data.InMemoryDataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.delete.DeleteFrom; +import org.apache.metamodel.drop.DropTable; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer; +import org.apache.metamodel.query.FunctionType; +import org.apache.metamodel.query.Query; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.Update; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.*; + +public class ElasticSearchDataContextTest { + + private static final String indexName = "twitter"; + private static final String indexType1 = "tweet1"; + private static final String indexType2 = "tweet2"; + private static final String indexName2 = "twitter2"; + private static final String indexType3 = "tweet3"; + private static final String bulkIndexType = "bulktype"; + private static final String peopleIndexType = "peopletype"; + private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}"; + private static EmbeddedElasticsearchServer embeddedElasticsearchServer; + private static Client client; + private static UpdateableDataContext dataContext; + + @BeforeClass + public static void beforeTests() throws Exception { + embeddedElasticsearchServer = new EmbeddedElasticsearchServer(); + client = embeddedElasticsearchServer.getClient(); + indexTweeterDocument(indexType1, 1); + indexTweeterDocument(indexType2, 1); + indexTweeterDocument(indexType2, 2, null); + insertPeopleDocuments(); + indexTweeterDocument(indexType2, 1); + indexBulkDocuments(indexName, bulkIndexType, 10); + + // The refresh API allows to explicitly refresh one or more index, + // making all operations performed since the last refresh available for + // search + embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet(); + dataContext = new ElasticSearchDataContext(client, indexName); + System.out.println("Embedded ElasticSearch server created!"); + } + + private static void insertPeopleDocuments() throws IOException { + indexOnePeopleDocument("female", 20, 5); + indexOnePeopleDocument("female", 17, 8); + indexOnePeopleDocument("female", 18, 9); + indexOnePeopleDocument("female", 19, 10); + indexOnePeopleDocument("female", 20, 11); + indexOnePeopleDocument("male", 19, 1); + indexOnePeopleDocument("male", 17, 2); + indexOnePeopleDocument("male", 18, 3); + indexOnePeopleDocument("male", 18, 4); + } + + @AfterClass + public static void afterTests() { + embeddedElasticsearchServer.shutdown(); + System.out.println("Embedded ElasticSearch server shut down!"); + } + + @Test + public void testSimpleQuery() throws Exception { + assertEquals("[bulktype, peopletype, tweet1, tweet2]", + Arrays.toString(dataContext.getDefaultSchema().getTableNames())); + + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + + assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames())); + + assertEquals(ColumnType.STRING, table.getColumnByName("user").getType()); + assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType()); + assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType()); + + try(DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) { + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user1, 1]]", ds.getRow().toString()); + } + } + + @Test + public void testDocumentIdAsPrimaryKey() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys(); + assertEquals(1, pks.length); + assertEquals("_id", pks[0].getName()); + + try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) { + assertTrue(ds.next()); + assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString()); + } + } + + @Test + public void testExecutePrimaryKeyLookupQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys(); + + try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) { + assertTrue(ds.next()); + Object dateValue = ds.getRow().getValue(2); + assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", user1]]", ds.getRow().toString()); + + assertFalse(ds.next()); + + assertEquals(InMemoryDataSet.class, ds.getClass()); + } + } + + @Test + public void testDateIsHandledAsDate() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + Column column = table.getColumnByName("postDate"); + ColumnType type = column.getType(); + assertEquals(ColumnType.DATE, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Date (or subclass)", value instanceof Date); + } + } + + @Test + public void testNumberIsHandledAsNumber() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Column column = table.getColumnByName("age"); + ColumnType type = column.getType(); + assertEquals(ColumnType.BIGINT, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Number (or subclass)", value instanceof Number); + } + } + + @Test + public void testCreateTableInsertQueryAndDrop() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames())); + + final Column fooColumn = table.getColumnByName("foo"); + final Column idColumn = table.getPrimaryKeys()[0]; + assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]", + idColumn.toString()); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.refreshSchemas(); + + try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) { + assertTrue(ds.next()); + assertEquals("hello", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertTrue(ds.next()); + assertEquals("world", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertFalse(ds.next()); + } + + dataContext.executeUpdate(new DropTable(table)); + + dataContext.refreshSchemas(); + + assertNull(dataContext.getTableByQualifiedLabel(table.getName())); + } + + @Test + public void testDetectOutsideChanges() throws Exception { + ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext; + + // Create the type in ES + final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices(); + final String tableType = "outsideTable"; + + Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" }; + + new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties) + .execute().actionGet(); + + dataContext.refreshSchemas(); + + assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType)); + + new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet(); + dataContext.refreshSchemas(); + assertNull(dataContext.getTableByQualifiedLabel(tableType)); + } + + @Test + public void testDeleteAll() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() + .toQuery()); + assertEquals("Row[values=[0]]", row.toString()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteByQuery() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, + dataContext.query().from(table).select("foo", "bar").toQuery()); + assertEquals("Row[values=[world, 43]]", row.toString()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteUnsupportedQueryType() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + // greater than is not yet supported + try { + dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); + fail("Exception expected"); + } catch (UnsupportedOperationException e) { + assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", + e.getMessage()); + } + + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test + public void testUpdateRow() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); + + DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); + assertTrue(dataSet.next()); + assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); + assertTrue(dataSet.next()); + assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); + assertFalse(dataSet.next()); + dataSet.close(); + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test + public void testDropTable() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + + // assert that the table was there to begin with + { + DataSet ds = dataContext.query().from(table).selectCount().execute(); + ds.next(); + assertEquals("Row[values=[9]]", ds.getRow().toString()); + ds.close(); + } + + dataContext.executeUpdate(new DropTable(table)); + try { + DataSet ds = dataContext.query().from(table).selectCount().execute(); + ds.next(); + assertEquals("Row[values=[0]]", ds.getRow().toString()); + ds.close(); + } finally { + // restore the people documents for the next tests + insertPeopleDocuments(); + client.admin().indices().prepareRefresh().execute().actionGet(); + dataContext = new ElasticSearchDataContext(client, indexName); + } + } + + @Test + public void testWhereColumnEqualsValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").execute()) { + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNullValues() throws Exception { + try(DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNull().execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[2]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNotNullValues() throws Exception { + try(DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNotNull().execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[1]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereMultiColumnsEqualValues() throws Exception { + try(DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").and("message").ne(5).execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnInValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .in("user4", "user5").orderBy("message").execute()) { + assertTrue(ds.next()); + + String row1 = ds.getRow().toString(); + assertEquals("Row[values=[user4, 4]]", row1); + assertTrue(ds.next()); + + String row2 = ds.getRow().toString(); + assertEquals("Row[values=[user5, 5]]", row2); + + assertFalse(ds.next()); + } + } + + @Test + public void testGroupByQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + + Query q = new Query(); + q.from(table); + q.groupBy(table.getColumnByName("gender")); + q.select(new SelectItem(table.getColumnByName("gender")), + new SelectItem(FunctionType.MAX, table.getColumnByName("age")), + new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*", + "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId")); + q.orderBy("gender"); + DataSet data = dataContext.executeQuery(q); + assertEquals( + "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]", + Arrays.toString(data.getSelectItems())); + + assertTrue(data.next()); + assertEquals("Row[values=[female, 20, 17, 5, 5]]", data.getRow().toString()); + assertTrue(data.next()); + assertEquals("Row[values=[male, 19, 17, 4, 1]]", data.getRow().toString()); + assertFalse(data.next()); + } + + @Test + public void testFilterOnNumberColumn() { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery(); + DataSet data = dataContext.executeQuery(q); + String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" }; + + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertFalse(data.next()); + } + + @Test + public void testMaxRows() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5); + DataSet dataSet = dataContext.executeQuery(query); + + TableModel tableModel = new DataSetTableModel(dataSet); + assertEquals(5, tableModel.getRowCount()); + } + + @Test + public void testCountQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = new Query().selectCount().from(table); + + List<Object[]> data = dataContext.executeQuery(q).toObjectArrays(); + assertEquals(1, data.size()); + Object[] row = data.get(0); + assertEquals(1, row.length); + assertEquals("[10]", Arrays.toString(row)); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueryForANonExistingTable() throws Exception { + dataContext.query().from("nonExistingTable").select("user").and("message").execute(); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueryForAnExistingTableAndNonExistingField() throws Exception { + indexTweeterDocument(indexType1, 1); + dataContext.query().from(indexType1).select("nonExistingField").execute(); + } + + @Test + public void testNonDynamicMapingTableNames() throws Exception { + createIndex(); + + ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2); + + assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames())); + } + + private static void createIndex() { + CreateIndexRequest cir = new CreateIndexRequest(indexName2); + CreateIndexResponse response = client.admin().indices().create(cir).actionGet(); + + System.out.println("create index: " + response.isAcknowledged()); + + PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping); + + PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet(); + System.out.println("put mapping: " + response2.isAcknowledged()); + } + + private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + for (int i = 0; i < numberOfDocuments; i++) { + bulkRequest.add(client.prepareIndex(indexName, indexType, Integer.toString(i)).setSource( + buildTweeterJson(i))); + } + bulkRequest.execute().actionGet(); + } + + private static void indexTweeterDocument(String indexType, int id, Date date) { + client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); + } + + private static void indexTweeterDocument(String indexType, int id) { + client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); + } + + private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException { + client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute() + .actionGet(); + } + + private static Map<String, Object> buildTweeterJson(int elementId) { + return buildTweeterJson(elementId, new Date()); + } + + private static Map<String, Object> buildTweeterJson(int elementId, Date date) { + Map<String, Object> map = new LinkedHashMap<>(); + map.put("user", "user" + elementId); + map.put("postDate", date); + map.put("message", elementId); + return map; + } + + private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException { + return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject(); + } + +}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java new file mode 100644 index 0000000..8b5eb50 --- /dev/null +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.LinkedHashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.schema.ColumnType; +import org.elasticsearch.common.collect.MapBuilder; + +public class ElasticSearchMetaDataParserTest extends TestCase { + + public void testParseMetadataInfo() throws Exception { + Map<String, Object> metadata = new LinkedHashMap<>(); + metadata.put("message", MapBuilder.newMapBuilder().put("type", "long").immutableMap()); + metadata.put("postDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap()); + metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap()); + metadata.put("user", MapBuilder.newMapBuilder().put("type", "string").immutableMap()); + metadata.put("critical", MapBuilder.newMapBuilder().put("type", "boolean").immutableMap()); + metadata.put("income", MapBuilder.newMapBuilder().put("type", "double").immutableMap()); + metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", "bar").immutableMap()); + + ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadata); + String[] columnNames = metaData.getColumnNames(); + ColumnType[] columnTypes = metaData.getColumnTypes(); + + assertTrue(columnNames.length == 8); + assertEquals(columnNames[0], "_id"); + assertEquals(columnNames[1], "message"); + assertEquals(columnNames[2], "postDate"); + assertEquals(columnNames[3], "anotherDate"); + assertEquals(columnNames[4], "user"); + assertEquals(columnNames[5], "critical"); + assertEquals(columnNames[6], "income"); + assertEquals(columnNames[7], "untypedthingie"); + + assertTrue(columnTypes.length == 8); + assertEquals(columnTypes[0], ColumnType.STRING); + assertEquals(columnTypes[1], ColumnType.BIGINT); + assertEquals(columnTypes[2], ColumnType.DATE); + assertEquals(columnTypes[3], ColumnType.DATE); + assertEquals(columnTypes[4], ColumnType.STRING); + assertEquals(columnTypes[5], ColumnType.BOOLEAN); + assertEquals(columnTypes[6], ColumnType.DOUBLE); + assertEquals(columnTypes[7], ColumnType.STRING); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java new file mode 100644 index 0000000..9ffc6b8 --- /dev/null +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import junit.framework.TestCase; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.MutableColumn; + +import java.util.*; + +public class ElasticSearchUtilsTest extends TestCase { + + public void testAssignDocumentIdForPrimaryKeys() throws Exception { + MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true); + SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn); + List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + Map<String, Object> values = new HashMap<>(); + values.put("value1", "theValue"); + Row row = NativeElasticSearchUtils.createRow(values, documentId, header); + String primaryKeyValue = (String) row.getValue(primaryKeyItem); + + assertEquals(primaryKeyValue, documentId); + } + + public void testCreateRowWithParsableDates() throws Exception { + SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING)); + SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE)); + List<SelectItem> selectItems1 = Arrays.asList(item1, item2); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + Map<String, Object> values = new HashMap<>(); + values.put("value1", "theValue"); + values.put("value2", "2013-01-04T15:55:51.217+01:00"); + Row row = NativeElasticSearchUtils.createRow(values, documentId, header); + Object stringValue = row.getValue(item1); + Object dateValue = row.getValue(item2); + + assertTrue(stringValue instanceof String); + assertTrue(dateValue instanceof Date); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java new file mode 100644 index 0000000..b94d0ab --- /dev/null +++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient.utils; + +import org.apache.commons.io.FileUtils; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.node.Node; + +import java.io.File; +import java.io.IOException; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +public class EmbeddedElasticsearchServer { + + private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data"; + + private final Node node; + private final String dataDirectory; + + public EmbeddedElasticsearchServer() { + this(DEFAULT_DATA_DIRECTORY); + } + + public EmbeddedElasticsearchServer(String dataDirectory) { + this.dataDirectory = dataDirectory; + + ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder() + .put("http.enabled", "true") + .put("path.data", dataDirectory); + + node = nodeBuilder() + .local(true) + .settings(elasticsearchSettings.build()) + .node(); + } + + public Client getClient() { + return node.client(); + } + + public void shutdown() { + node.close(); + deleteDataDirectory(); + } + + private void deleteDataDirectory() { + try { + FileUtils.deleteDirectory(new File(dataDirectory)); + } catch (IOException e) { + throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index 275b75f..e3ccfb5 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -17,41 +17,16 @@ </parent> <modelVersion>4.0.0</modelVersion> <artifactId>MetaModel-elasticsearch</artifactId> - <name>MetaModel module for Elasticsearch analytics engine</name> + <packaging>pom</packaging> + <name>MetaModel module for Elasticsearch</name> <properties> <elasticsearch.version>1.4.4</elasticsearch.version> </properties> - <dependencies> - <dependency> - <groupId>org.apache.metamodel</groupId> - <artifactId>MetaModel-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - - <!-- elasticsearch --> - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> - </dependency> - - <!-- test --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - + <modules> + <module>common</module> + <module>native</module> + <module>rest</module> + </modules> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml new file mode 100644 index 0000000..e4a4a79 --- /dev/null +++ b/elasticsearch/rest/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>MetaModel-elasticsearch</artifactId> + <groupId>org.apache.metamodel</groupId> + <version>4.4.2-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <properties> + <jest.version>0.1.7</jest.version> + <elasticsearch.version>1.4.4</elasticsearch.version> + </properties> + + <artifactId>MetaModel-elasticsearch-rest</artifactId> + <name>MetaModel module for ElasticSearch via REST client</name> + + <dependencies> + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-elasticsearch-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <!-- Jest --> + <dependency> + <groupId>io.searchbox</groupId> + <artifactId>jest</artifactId> + <version>${jest.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </dependency> + <!-- elasticsearch --> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- test --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java new file mode 100644 index 0000000..2219b89 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.metamodel.DataContext; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.QueryPostprocessDataContext; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.MutableColumn; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.util.SimpleTableDef; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import io.searchbox.client.JestClient; +import io.searchbox.client.JestResult; +import io.searchbox.core.Count; +import io.searchbox.core.CountResult; +import io.searchbox.core.Get; +import io.searchbox.core.Search; +import io.searchbox.core.SearchResult; +import io.searchbox.indices.mapping.GetMapping; +import io.searchbox.params.Parameters; + +/** + * DataContext implementation for ElasticSearch analytics engine. + * + * ElasticSearch has a data storage structure hierarchy that briefly goes like + * this: + * <ul> + * <li>Index</li> + * <li>Document type (short: Type) (within an index)</li> + * <li>Documents (of a particular type)</li> + * </ul> + * + * When instantiating this DataContext, an index name is provided. Within this + * index, each document type is represented as a table. + * + * This implementation supports either automatic discovery of a schema or manual + * specification of a schema, through the {@link SimpleTableDef} class. + */ +public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class); + + public static final String FIELD_ID = "_id"; + + // 1 minute timeout + public static final String TIMEOUT_SCROLL = "1m"; + + private final JestClient elasticSearchClient; + + private final String indexName; + // Table definitions that are set from the beginning, not supposed to be changed. + private final List<SimpleTableDef> staticTableDefinitions; + + // Table definitions that are discovered, these can change + private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); + + /** + * Constructs a {@link ElasticSearchRestDataContext}. This constructor accepts a + * custom array of {@link SimpleTableDef}s which allows the user to define + * his own view on the indexes in the engine. + * + * @param client + * the ElasticSearch client + * @param indexName + * the name of the ElasticSearch index to represent + * @param tableDefinitions + * an array of {@link SimpleTableDef}s, which define the table + * and column model of the ElasticSearch index. + */ + public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) { + if (client == null) { + throw new IllegalArgumentException("ElasticSearch Client cannot be null"); + } + if (indexName == null || indexName.trim().length() == 0) { + throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); + } + this.elasticSearchClient = client; + this.indexName = indexName; + this.staticTableDefinitions = Arrays.asList(tableDefinitions); + this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); + } + + /** + * Constructs a {@link ElasticSearchRestDataContext} and automatically detects + * the schema structure/view on all indexes (see + * {@link this.detectSchema(JestClient, String)}). + * + * @param client + * the ElasticSearch client + * @param indexName + * the name of the ElasticSearch index to represent + */ + public ElasticSearchRestDataContext(JestClient client, String indexName) { + this(client, indexName, new SimpleTableDef[0]); + } + + /** + * Performs an analysis of the available indexes in an ElasticSearch cluster + * {@link JestClient} instance and detects the elasticsearch types structure + * based on the metadata provided by the ElasticSearch java client. + * + * @see #detectTable(JsonObject, String) + * @return a mutable schema instance, useful for further fine tuning by the + * user. + */ + private SimpleTableDef[] detectSchema() { + logger.info("Detecting schema for index '{}'", indexName); + + final JestResult jestResult; + try { + final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build(); + jestResult = elasticSearchClient.execute(getMapping); + } catch (Exception e) { + logger.error("Failed to retrieve mappings" , e); + throw new MetaModelException("Failed to execute request for index information needed to detect schema", e); + } + + if(!jestResult.isSucceeded()){ + logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage()); + throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage()); + } + + final List<SimpleTableDef> result = new ArrayList<>(); + + final Set<Map.Entry<String, JsonElement>> mappings = + jestResult.getJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").entrySet(); + if(mappings.size() == 0){ + logger.warn("No metadata returned for index name '{}' - no tables will be detected."); + } else { + + for (Map.Entry<String, JsonElement> entry : mappings) { + final String documentType = entry.getKey(); + + try { + final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties").getAsJsonObject(), documentType); + result.add(table); + } catch (Exception e) { + logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); + } + } + } + final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); + Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() { + @Override + public int compare(SimpleTableDef o1, SimpleTableDef o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + + return tableDefArray; + } + + /** + * Performs an analysis of an available index type in an ElasticSearch + * {@link JestClient} client and tries to detect the index structure based on + * the metadata provided by the java client. + * + * @param metadataProperties + * the ElasticSearch mapping + * @param documentType + * the name of the index type + * @return a table definition for ElasticSearch. + */ + private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) { + final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties); + return new SimpleTableDef(documentType, metaData.getColumnNames(), + metaData.getColumnTypes()); + } + + @Override + protected Schema getMainSchema() throws MetaModelException { + final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); + for (final SimpleTableDef tableDef : staticTableDefinitions) { + addTable(theSchema, tableDef); + } + + final SimpleTableDef[] tables = detectSchema(); + synchronized (this) { + dynamicTableDefinitions.clear(); + dynamicTableDefinitions.addAll(Arrays.asList(tables)); + for (final SimpleTableDef tableDef : dynamicTableDefinitions) { + final List<String> tableNames = Arrays.asList(theSchema.getTableNames()); + + if (!tableNames.contains(tableDef.getName())) { + addTable(theSchema, tableDef); + } + } + } + + return theSchema; + } + + private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { + final MutableTable table = tableDef.toTable().setSchema(theSchema); + final Column idColumn = table.getColumnByName(FIELD_ID); + if (idColumn != null && idColumn instanceof MutableColumn) { + final MutableColumn mutableColumn = (MutableColumn) idColumn; + mutableColumn.setPrimaryKey(true); + } + theSchema.addTable(table); + } + + @Override + protected String getMainSchemaName() throws MetaModelException { + return indexName; + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, + List<FilterItem> whereItems, int firstRow, int maxRows) { + final QueryBuilder queryBuilder = ElasticSearchUtils + .createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND); + if (queryBuilder != null) { + // where clause can be pushed down to an ElasticSearch query + SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder); + SearchResult result = executeSearch(table, searchSourceBuilder, false); + + return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems); + } + return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); + } + + private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) { + Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(table.getName()); + if(scroll){ + builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL); + } + + Search search = builder.build(); + SearchResult result; + try { + result = elasticSearchClient.execute(search); + } catch (Exception e){ + logger.warn("Could not execute ElasticSearch query", e); + throw new MetaModelException("Could not execute ElasticSearch query", e); + } + return result; + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) { + SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), limitMaxRowsIsSet(maxRows)); + + return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns); + } + + private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) { + final SearchSourceBuilder searchRequest = new SearchSourceBuilder(); + if (firstRow > 1) { + final int zeroBasedFrom = firstRow - 1; + searchRequest.from(zeroBasedFrom); + } + if (limitMaxRowsIsSet(maxRows)) { + searchRequest.size(maxRows); + } + + if (queryBuilder != null) { + searchRequest.query(queryBuilder); + } + + return searchRequest; + } + + @Override + protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn, + Object keyValue) { + if (keyValue == null) { + return null; + } + + final String documentType = table.getName(); + final String id = keyValue.toString(); + + final Get get = new Get.Builder(indexName, id).type(documentType).build(); + final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get); + + final DataSetHeader header = new SimpleDataSetHeader(selectItems); + + return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header); + } + + @Override + protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) { + if (!whereItems.isEmpty()) { + // not supported - will have to be done by counting client-side + return null; + } + final String documentType = table.getName(); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(QueryBuilders.termQuery("_type", documentType)); + + Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build(); + + CountResult countResult; + try { + countResult = elasticSearchClient.execute(count); + } catch (Exception e){ + logger.warn("Could not execute ElasticSearch get query", e); + throw new MetaModelException("Could not execute ElasticSearch get query", e); + } + + return countResult.getCount(); + } + + private boolean limitMaxRowsIsSet(int maxRows) { + return (maxRows != -1); + } + + @Override + public void executeUpdate(UpdateScript update) { + final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this); + update.run(callback); + callback.onExecuteUpdateFinished(); + } + + /** + * Gets the {@link JestClient} that this {@link DataContext} is wrapping. + */ + public JestClient getElasticSearchClient() { + return elasticSearchClient; + } + + /** + * Gets the name of the index that this {@link DataContext} is working on. + */ + public String getIndexName() { + return indexName; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java new file mode 100644 index 0000000..1bb026d --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.action.Action; +import io.searchbox.client.JestClient; +import io.searchbox.client.JestResult; +import org.apache.metamodel.MetaModelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +final class JestClientExecutor { + private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class); + + static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) { + return execute(jestClient, clientRequest, true); + } + + static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) { + try { + final T result = jestClient.execute(clientRequest); + logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded()); + return result; + } catch (IOException e) { + logger.warn("Could not execute command {} ", clientRequest, e); + if (doThrow) { + throw new MetaModelException("Could not execute command " + clientRequest, e); + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java new file mode 100644 index 0000000..cc42b07 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.action.GenericResultAbstractAction; + +public class JestDeleteScroll extends GenericResultAbstractAction { + private JestDeleteScroll(Builder builder) { + super(builder); + this.payload = builder.getScrollId(); + setURI(buildURI()); + } + + @Override + public String getRestMethodName() { + return "DELETE"; + } + + @Override + protected String buildURI() { + return super.buildURI() + "/_search/scroll"; + } + + public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> { + private final String scrollId; + + public Builder(String scrollId) { + this.scrollId = scrollId; + } + + @Override + public JestDeleteScroll build() { + return new JestDeleteScroll(this); + } + + public String getScrollId() { + return scrollId; + } + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java new file mode 100644 index 0000000..cc26c8d --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.indices.mapping.PutMapping; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.create.AbstractTableCreationBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.*; + +import java.util.List; + +final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> { + public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema, String name) { + super(updateCallback, schema, name); + } + + @Override + public Table execute() throws MetaModelException { + final MutableTable table = getTable(); + final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table); + + final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext(); + final String indexName = dataContext.getIndexName(); + + final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), sourceProperties).build(); + JestClientExecutor.execute(dataContext.getElasticSearchClient(), putMapping); + + final MutableSchema schema = (MutableSchema) getSchema(); + schema.addTable(table); + return table; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java new file mode 100644 index 0000000..9678b48 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import io.searchbox.client.JestClient; +import io.searchbox.client.JestResult; +import io.searchbox.core.SearchScroll; +import org.apache.metamodel.data.AbstractDataSet; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * {@link DataSet} implementation for ElasticSearch + */ +final class JestElasticSearchDataSet extends AbstractDataSet { + + private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDataSet.class); + + private final JestClient _client; + private final AtomicBoolean _closed; + + private JestResult _searchResponse; + private JsonObject _currentHit; + private int _hitIndex = 0; + + public JestElasticSearchDataSet(JestClient client, JestResult searchResponse, List<SelectItem> selectItems) { + super(selectItems); + _client = client; + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + + public JestElasticSearchDataSet(JestClient client, JestResult searchResponse, Column[] columns) { + super(columns); + _client = client; + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + + @Override + public void close() { + super.close(); + boolean closeNow = _closed.compareAndSet(true, false); + if (closeNow) { + final String scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString(); + JestClientExecutor.execute(_client, new JestDeleteScroll.Builder(scrollId).build(), false); + } + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (!_closed.get()) { + logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); + close(); + } + } + + @Override + public boolean next() { + final JsonArray hits = _searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); + if (hits.size() == 0) { + // break condition for the scroll + _currentHit = null; + return false; + } + + if (_hitIndex < hits.size()) { + // pick the next hit within this search response + _currentHit = hits.get(_hitIndex).getAsJsonObject(); + _hitIndex++; + return true; + } + + final JsonPrimitive scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id"); + if (scrollId == null) { + // this search response is not scrollable - then it's the end. + _currentHit = null; + return false; + } + + // try to scroll to the next set of hits + SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build(); + + _searchResponse = JestClientExecutor.execute(_client, scroll); + + // start over (recursively) + _hitIndex = 0; + return next(); + } + + @Override + public Row getRow() { + if (_currentHit == null) { + return null; + } + + final JsonObject source = _currentHit.getAsJsonObject("_source"); + final String documentId = _currentHit.get("_id").getAsString(); + return JestElasticSearchUtils.createRow(source, documentId, getHeader()); + + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java new file mode 100644 index 0000000..a4c0c03 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.core.DeleteByQuery; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.delete.AbstractRowDeletionBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.util.List; + +/** + * {@link RowDeletionBuilder} implementation for + * {@link ElasticSearchRestDataContext}. + */ +final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { + private final JestElasticSearchUpdateCallback _updateCallback; + + public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + final Table table = getTable(); + final String documentType = table.getName(); + + final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext(); + final String indexName = dataContext.getIndexName(); + + final List<FilterItem> whereItems = getWhereItems(); + + // delete by query - note that creteQueryBuilderForSimpleWhere may + // return matchAllQuery() if no where items are present. + final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, + LogicalOperator.AND); + if (queryBuilder == null) { + // TODO: The where items could not be pushed down to a query. We + // could solve this by running a query first, gather all + // document IDs and then delete by IDs. + throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " + + whereItems); + } + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + + final DeleteByQuery deleteByQuery = + new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType( + documentType).build(); + + JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteByQuery); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java new file mode 100644 index 0000000..d4ddd19 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.indices.mapping.DeleteMapping; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.drop.AbstractTableDropBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TableDropBuilder} for dropping tables (document types) in an + * ElasticSearch index. + */ +final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder { + + private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class); + + private final JestElasticSearchUpdateCallback _updateCallback; + + public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + + final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext(); + final Table table = getTable(); + final String documentType = table.getName(); + logger.info("Deleting mapping / document type: {}", documentType); + + final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build(); + + JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteIndex); + + final MutableSchema schema = (MutableSchema) table.getSchema(); + schema.removeTable(table); + + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java new file mode 100644 index 0000000..327d7d3 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.core.DocumentResult; +import io.searchbox.core.Index; +import io.searchbox.params.Parameters; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> { + + private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchInsertBuilder.class); + + public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { + super(updateCallback, table); + } + + @Override + public void execute() throws MetaModelException { + final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext(); + final String indexName = dataContext.getIndexName(); + final String documentType = getTable().getName(); + + + final Map<String, Object> source = new HashMap<>(); + final Column[] columns = getColumns(); + final Object[] values = getValues(); + String id = null; + for (int i = 0; i < columns.length; i++) { + if (isSet(columns[i])) { + final String name = columns[i].getName(); + final Object value = values[i]; + if (ElasticSearchRestDataContext.FIELD_ID.equals(name)) { + if (value != null) { + id = value.toString(); + } + } else { + source.put(name, value); + } + } + } + + assert !source.isEmpty(); + + Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter( + Parameters.OP_TYPE, "create").build(); + + final DocumentResult result = JestClientExecutor.execute(dataContext.getElasticSearchClient(), index); + + logger.debug("Inserted document: id={}", result.getId()); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java new file mode 100644 index 0000000..074de2e --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.ColumnType; + +import java.util.Map.Entry; + +/** + * Parser that transforms the ElasticSearch metadata response (json-like format) + * into an ElasticSearchMetaData object. + */ +final class JestElasticSearchMetaDataParser { + + /** + * Parses the ElasticSearch meta data info into an ElasticSearchMetaData + * object. This method makes much easier to create the ElasticSearch schema. + * + * @param metaDataInfo + * ElasticSearch mapping metadata in Map format + * @return An ElasticSearchMetaData object + */ + public static ElasticSearchMetaData parse(JsonObject metaDataInfo) { + final int columns = metaDataInfo.entrySet().size() + 1; + final String[] fieldNames = new String[columns]; + final ColumnType[] columnTypes = new ColumnType[columns]; + + // add the document ID field (fixed) + fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID; + columnTypes[0] = ColumnType.STRING; + + int i = 1; + for (Entry<String, JsonElement> metaDataField : metaDataInfo.entrySet()) { + JsonElement fieldMetadata = metaDataField.getValue(); + + fieldNames[i] = metaDataField.getKey(); + columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata); + i++; + + } + return new ElasticSearchMetaData(fieldNames, columnTypes); + } + + private static ColumnType getColumnTypeFromMetadataField(JsonElement fieldMetadata) { + final JsonElement typeElement = ((JsonObject) fieldMetadata).get("type"); + if (typeElement != null) { + String metaDataFieldType = typeElement.getAsString(); + + return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType); + } else { + return ColumnType.STRING; + } + } +}