http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java new file mode 100644 index 0000000..a3e9f1c --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.view; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor; + +public class LoadRequest { + + private String databaseName; + private String tableName; + private String inputPath; + private Map<String, String> options; + private boolean isOverwrite; + + public LoadRequest() { + } + + public LoadRequest(String databaseName, String tableName, String inputPaths, + Map<String, String> options, boolean isOverwrite) { + this.databaseName = databaseName; + this.tableName = tableName; + this.inputPath = inputPaths; + this.options = options; + this.isOverwrite = isOverwrite; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getInputPath() { + return inputPath; + } + + public void setInputPath(String inputPath) { + this.inputPath = inputPath; + } + + public Map<String, String> getOptions() { + return options; + } + + public void setOptions(Map<String, String> options) { + this.options = options; + } + + public boolean isOverwrite() { + return isOverwrite; + } + + public void setOverwrite(boolean overwrite) { + isOverwrite = overwrite; + } + + public LoadDescriptor convertToDto() { + return new LoadDescriptor(databaseName, tableName, inputPath, options, isOverwrite); + } + + public static class Builder { + private LoadRequest load; + private Map<String, String> options; + + private Builder() { + load = new LoadRequest(); + options = new HashMap<>(); + } + + public Builder databaseName(String databaseName) { + load.setDatabaseName(databaseName); + return this; + } + + public Builder tableName(String tableName) { + load.setTableName(tableName); + return this; + } + + public Builder overwrite(boolean isOverwrite) { + load.setOverwrite(isOverwrite); + return this; + } + + public Builder inputPath(String inputPath) { + load.setInputPath(inputPath); + return this; + } + + public Builder options(String key, String value) { + options.put(key, value); + return this; + } + + public LoadRequest create() { + load.setOptions(options); + return load; + } + } + + public static Builder builder() { + return new Builder(); + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java new file mode 100644 index 0000000..3d5b3df --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.view; + +import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor; + +public class SelectRequest { + + private String databaseName; + private String tableName; + private String[] select; + private String filter; + private int limit; + + public SelectRequest() { + + } + + public SelectRequest(String databaseName, String tableName, String[] select, String filter, + int limit) { + this.databaseName = databaseName; + this.tableName = tableName; + this.select = select; + this.filter = filter; + this.limit = limit; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String[] getSelect() { + return select; + } + + public void setSelect(String[] select) { + this.select = select; + } + + public String getFilter() { + return filter; + } + + public void setFilter(String filter) { + this.filter = filter; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public SelectDescriptor convertToDto() { + return new SelectDescriptor( + databaseName, tableName, select, filter, limit); + } + + public static class Builder { + + private SelectRequest select; + + private Builder() { + select = new SelectRequest(); + } + + public Builder databaseName(String databaseName) { + select.setDatabaseName(databaseName); + return this; + } + + public Builder tableName(String tableName) { + select.setTableName(tableName); + return this; + } + + public Builder select(String... columnNames) { + select.setSelect(columnNames); + return this; + } + + public Builder filter(String fitler) { + select.setFilter(fitler); + return this; + } + + public Builder limit(int limit) { + select.setLimit(limit); + return this; + } + + public SelectRequest create() { + return select; + } + } + + public static Builder builder() { + return new Builder(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java new file mode 100644 index 0000000..edf584a --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.view; + +public class SelectResponse { + + private String selectId; + private Object[][] rows; + + public SelectResponse() { + + } + + public SelectResponse(String selectId, Object[][] rows) { + this.selectId = selectId; + this.rows = rows; + } + + public String getSelectId() { + return selectId; + } + + public void setSelectId(String selectId) { + this.selectId = selectId; + } + + public Object[][] getRows() { + return rows; + } + + public void setRows(Object[][] rows) { + this.rows = rows; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java new file mode 100644 index 0000000..7495ca3 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.service; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.horizon.antlr.ANTLRNoCaseStringStream; +import org.apache.carbondata.horizon.antlr.FilterVisitor; +import org.apache.carbondata.horizon.antlr.gen.ExpressionLexer; +import org.apache.carbondata.horizon.antlr.gen.ExpressionParser; +import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor; +import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor; +import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; +import org.apache.carbondata.sdk.file.CarbonWriterBuilder; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.store.exception.StoreException; +import org.apache.carbondata.store.master.Master; + +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CommonTokenStream; + +public class HorizonService { + + private static LogService LOGGER = + LogServiceFactory.getLogService(HorizonService.class.getName()); + + private static HorizonService instance; + + private Master master; + + private HorizonService() { + master = Master.getInstance(null); + } + + public boolean createTable(TableDescriptor tableDescriptor) throws StoreException { + TableSchemaBuilder builder = TableSchema.builder(); + builder.tableName(tableDescriptor.getName()).properties(tableDescriptor.getProperties()); + + Field[] fields = tableDescriptor.getSchema().getFields(); + // sort_columns + List<String> sortColumnsList = null; + try { + sortColumnsList = + tableDescriptor.getSchema().prepareSortColumns(tableDescriptor.getProperties()); + } catch (MalformedCarbonCommandException e) { + throw new StoreException(e.getMessage()); + } + ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()]; + // tableDescriptor schema + CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList); + builder.setSortColumns(Arrays.asList(sortColumnsSchemaList)); + + TableSchema schema = builder.build(); + SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry(); + schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis()); + schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry); + schema.setTableName(tableDescriptor.getName()); + + TableInfo tableInfo = CarbonTable + .builder() + .databaseName(tableDescriptor.getDatabase()) + .tableName(tableDescriptor.getName()) + .tablePath( + master.getTableFolder(tableDescriptor.getDatabase(), tableDescriptor.getName())) + .tableSchema(schema) + .isTransactionalTable(true) + .buildTableInfo(); + + try { + return master.createTable(tableInfo, tableDescriptor.isIfNotExists()); + } catch (IOException e) { + LOGGER.error(e, "create tableDescriptor failed"); + throw new StoreException(e.getMessage()); + } + } + + public boolean loadData(LoadDescriptor loadDescriptor) throws StoreException { + CarbonTable table = master.getTable( + loadDescriptor.getDatabaseName(), loadDescriptor.getTableName()); + CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table); + modelBuilder.setInputPath(loadDescriptor.getInputPath()); + try { + CarbonLoadModel model = + modelBuilder.build(loadDescriptor.getOptions(), System.currentTimeMillis(), "0"); + + return master.loadData(model, loadDescriptor.isOverwrite()); + } catch (InvalidLoadOptionException e) { + LOGGER.error(e, "Invalid loadDescriptor options"); + throw new StoreException(e.getMessage()); + } catch (IOException e) { + LOGGER.error(e, "Failed to loadDescriptor data"); + throw new StoreException(e.getMessage()); + } + } + + public CarbonRow[] select(SelectDescriptor selectDescriptor) throws StoreException { + try { + CarbonTable carbonTable = master.getTable( + selectDescriptor.getDatabaseName(), selectDescriptor.getTableName()); + return master.search( + carbonTable, + selectDescriptor.getProjection(), + parseFilter(selectDescriptor.getFilter(), carbonTable), + selectDescriptor.getLimit(), + selectDescriptor.getLimit()); + } catch (IOException e) { + LOGGER.error(e, "[" + selectDescriptor.getId() + "] select failed"); + throw new StoreException(e.getMessage()); + } + } + + public static Expression parseFilter(String filter, CarbonTable carbonTable) { + if (filter == null) { + return null; + } + CharStream input = new ANTLRNoCaseStringStream(filter); + ExpressionLexer lexer = new ExpressionLexer(input); + CommonTokenStream tokens = new CommonTokenStream(lexer); + ExpressionParser parser = new ExpressionParser(tokens); + ExpressionParser.ParseFilterContext tree = parser.parseFilter(); + FilterVisitor visitor = new FilterVisitor(carbonTable); + return visitor.visitParseFilter(tree); + } + + public static synchronized HorizonService getInstance() { + if (instance == null) { + instance = new HorizonService(); + } + return instance; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java new file mode 100644 index 0000000..72ea0a7 --- /dev/null +++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon; + +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor; +import org.apache.carbondata.horizon.rest.service.HorizonService; +import org.apache.carbondata.sdk.file.CarbonWriterBuilder; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FilterParseTest { + + private static CarbonTable carbonTable; + + @BeforeClass + public static void setup() throws MalformedCarbonCommandException { + + CreateTableRequest createTableRequest = CreateTableRequest + .builder() + .ifNotExists() + .databaseName("default") + .tableName("table_1") + .comment("first tableDescriptor") + .column("shortField", "SHORT", "short field") + .column("intField", "INT", "int field") + .column("bigintField", "LONG", "long field") + .column("doubleField", "DOUBLE", "double field") + .column("stringField", "STRING", "string field") + .column("timestampField", "TIMESTAMP", "timestamp field") + .column("decimalField", "DECIMAL", 18, 2, "decimal field") + .column("dateField", "DATE", "date field") + .column("charField", "CHAR", "char field") + .column("floatField", "FLOAT", "float field") + .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") + .create(); + + TableDescriptor tableDescriptor = createTableRequest.convertToDto(); + + TableSchemaBuilder builder = TableSchema.builder(); + builder.tableName(tableDescriptor.getName()).properties(tableDescriptor.getProperties()); + + Field[] fields = tableDescriptor.getSchema().getFields(); + // sort_columns + List<String> sortColumnsList = + tableDescriptor.getSchema().prepareSortColumns(tableDescriptor.getProperties()); + ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()]; + // tableDescriptor schema + CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList); + builder.setSortColumns(Arrays.asList(sortColumnsSchemaList)); + + TableSchema schema = builder.build(); + SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry(); + schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis()); + schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry); + schema.setTableName(tableDescriptor.getName()); + + carbonTable = CarbonTable + .builder() + .databaseName(tableDescriptor.getDatabase()) + .tableName(tableDescriptor.getName()) + .tablePath("") + .tableSchema(schema) + .isTransactionalTable(true) + .build(); + } + + + private void checkExpression(String sql1, String sql2) { + Expression expression = HorizonService.parseFilter(sql1, carbonTable); + Assert.assertEquals(sql2, expression.getStatement()); + } + + private void checkExpression(String sql) { + checkExpression(sql, sql); + } + + @Test + public void testFilterParse() { + + // >, >=, <, <=, =, <>, != + checkExpression("intField > 10"); + checkExpression("intField >= 10"); + checkExpression("intField < 10"); + checkExpression("intField <= 10"); + checkExpression("intField = 10"); + checkExpression("stringField = 'carbon'"); + checkExpression("intField <> 10"); + checkExpression("stringField <> 'carbon'"); + checkExpression("intField != 10", "intField <> 10"); + checkExpression("stringField != 'carbon'", "stringField <> 'carbon'"); + + // is null, is not null + checkExpression("stringField is null"); + checkExpression("stringField is not null"); + + // in, not in + checkExpression("intField in (10, 20, 30)" ); + checkExpression("stringField in ('spark', 'carbon')" ); + checkExpression("intField not in (10, 20, 30)" ); + checkExpression("stringField not in ('spark', 'carbon')" ); + + // between and, not between and + checkExpression("intField between 10 and 30", "intField >= 10 and intField <= 30" ); + checkExpression("intField not between 10 and 30", "intField > 30 and intField < 10" ); + + // and, or + checkExpression( + "intField > 10 and stringField = 'carbon'", + "(intField > 10 and stringField = 'carbon')"); + checkExpression( + "(intField > 10) and stringField = 'carbon'", + "(intField > 10 and stringField = 'carbon')"); + checkExpression( + "(intField > 10) or stringField = 'carbon'", + "(intField > 10 or stringField = 'carbon')"); + checkExpression( + "intField > -10 or (stringField = 'carbon' and floatField > 5.0)", + "(intField > -10 or (stringField = 'carbon' and floatField > 5.0))"); + + // data type: short, int, bigint, double, decimal, bigDecimal string, timestamp, date + checkExpression("shortField = 1+S", "shortField = 1"); + checkExpression("intField = 1"); + checkExpression("bigintField = 1+L", "bigintField = 1"); + checkExpression("doubleField = 1.01+D", "doubleField = 1.01"); + checkExpression("decimalField = 1000.01001", "decimalField = 1000.01001"); + checkExpression("decimalField = 1000.01001+BD", "decimalField = 1000.01001"); + checkExpression("stringField = 'carbon'"); + checkExpression("timestampField = '2018-01-01 10:01:01'"); + checkExpression("dateField = '2018-01-01'"); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java new file mode 100644 index 0000000..e57e813 --- /dev/null +++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon; + +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.store.conf.StoreConf; +import org.apache.carbondata.horizon.rest.controller.Horizon; +import org.apache.carbondata.store.master.Master; +import org.apache.carbondata.horizon.rest.model.view.LoadRequest; +import org.apache.carbondata.horizon.rest.model.view.SelectRequest; +import org.apache.carbondata.horizon.rest.model.view.SelectResponse; +import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; +import org.apache.carbondata.store.util.StoreUtil; +import org.apache.carbondata.store.worker.Worker; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.springframework.web.client.RestTemplate; + +public class HorizonTest { + + private static Master master; + private static Worker worker; + private static String serviceUri = "http://localhost:8080"; + private static String projectFolder; + + private static RestTemplate restTemplate; + + @BeforeClass + public static void setup() throws IOException, InterruptedException { + projectFolder = new File(HorizonTest.class.getResource("/").getPath() + + "../../../../").getCanonicalPath(); + String log4jFile = projectFolder + "/store/conf/log4j.properties"; + String confFile = projectFolder + "/store/conf/store.conf"; + + System.setProperty("log.path", projectFolder + "/store/core/target/master_worker.log"); + StoreUtil.initLog4j(log4jFile); + + StoreConf storeConf = new StoreConf(confFile); + storeConf.conf( + StoreConf.STORE_LOCATION, + storeConf.storeLocation() + System.currentTimeMillis()); + + // start master + master = Master.getInstance(storeConf); + master.startService(); + + new Thread() { + public void run() { + Horizon.main(new String[0]); + } + }.start(); + Thread.sleep(10000); + + // start worker + worker = new Worker(storeConf); + worker.start(); + + restTemplate = new RestTemplate(); + } + + @Test + public void testHorizon() { + // create table if not exists + CreateTableRequest table = CreateTableRequest + .builder() + .ifNotExists() + .databaseName("default") + .tableName("table_1") + .comment("first table") + .column("shortField", "SHORT", "short field") + .column("intField", "INT", "int field") + .column("bigintField", "LONG", "long field") + .column("doubleField", "DOUBLE", "double field") + .column("stringField", "STRING", "string field") + .column("timestampField", "TIMESTAMP", "timestamp field") + .column("decimalField", "DECIMAL", 18, 2, "decimal field") + .column("dateField", "DATE", "date field") + .column("charField", "CHAR", "char field") + .column("floatField", "FLOAT", "float field") + .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") + .create(); + String createTable = + restTemplate.postForObject(serviceUri + "/table/create", table, String.class); + Assert.assertEquals(true, Boolean.valueOf(createTable)); + + // load one segment + LoadRequest load = LoadRequest + .builder() + .databaseName("default") + .tableName("table_1") + .overwrite(false) + .inputPath(projectFolder + "/store/horizon/src/test/resources/data1.csv") + .options("header", "true") + .create(); + String loadData = + restTemplate.postForObject(serviceUri + "/table/load", load, String.class); + Assert.assertEquals(true, Boolean.valueOf(loadData)); + + // select row + SelectRequest select = SelectRequest + .builder() + .databaseName("default") + .tableName("table_1") + .select("intField", "stringField") + .limit(5) + .create(); + SelectResponse result = + restTemplate.postForObject(serviceUri + "/table/select", select, SelectResponse.class); + Assert.assertEquals(5, result.getRows().length); + + // select row with filter + SelectRequest filter = SelectRequest + .builder() + .databaseName("default") + .tableName("table_1") + .select("intField", "stringField") + .filter("intField = 11") + .limit(5) + .create(); + SelectResponse fitlerResult = + restTemplate.postForObject(serviceUri + "/table/select", filter, SelectResponse.class); + Assert.assertEquals(1, fitlerResult.getRows().length); + } + + @AfterClass + public static void release() throws InterruptedException { + worker.stop(); + Horizon.close(); + master.stopService(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/test/resources/data1.csv ---------------------------------------------------------------------- diff --git a/store/horizon/src/test/resources/data1.csv b/store/horizon/src/test/resources/data1.csv new file mode 100644 index 0000000..cf732eb --- /dev/null +++ b/store/horizon/src/test/resources/data1.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField +1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5 +5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5 +1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5 +1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5 +1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5 +3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5 +2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5 +1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5 +4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5 +1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5 http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java index 627e060..c3967f4 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java @@ -40,13 +40,13 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; * Implementation to write rows in CSV format to carbondata file. */ @InterfaceAudience.Internal -class CSVCarbonWriter extends CarbonWriter { +public class CSVCarbonWriter extends CarbonWriter { private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; private TaskAttemptContext context; private ObjectArrayWritable writable; - CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException { + public CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException { Configuration hadoopConf = new Configuration(); CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel); CarbonTableOutputFormat format = new CarbonTableOutputFormat(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index d4b1c5b..4297e30 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -463,7 +463,7 @@ public class CarbonWriterBuilder { return table; } - private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder, + public static void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder, List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) { Set<String> uniqueFields = new HashSet<>(); // a counter which will be used in case of complex array type. This valIndex will be assigned http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java index 6131d45..c9622e1 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java @@ -18,18 +18,26 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonUtil; import com.google.gson.GsonBuilder; import com.google.gson.TypeAdapter; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; +import org.apache.commons.lang.StringUtils; /** * A schema used to write and read data files @@ -108,4 +116,63 @@ public class Schema { }); return this; } + + public List<String> prepareSortColumns(Map<String, String> properties) + throws MalformedCarbonCommandException { + + List<String> sortColumnsList = new ArrayList<>(); + Set<Map.Entry<String, String>> entries = properties.entrySet(); + String sortKeyString = null; + for (Map.Entry<String, String> entry : entries) { + if (CarbonCommonConstants.SORT_COLUMNS.equalsIgnoreCase(entry.getKey())) { + sortKeyString = CarbonUtil.unquoteChar(entry.getValue()).trim(); + } + } + + if (sortKeyString != null) { + String[] sortKeys = sortKeyString.split(",", -1); + for (int i = 0; i < sortKeys.length; i++) { + sortKeys[i] = sortKeys[i].trim().toLowerCase(); + if (StringUtils.isEmpty(sortKeys[i])) { + throw new MalformedCarbonCommandException("SORT_COLUMNS contains illegal argument."); + } + } + + for (int i = sortKeys.length - 2; i >= 0; i--) { + for (int j = i + 1; j < sortKeys.length; j++) { + if (sortKeys[i].equals(sortKeys[j])) { + throw new MalformedCarbonCommandException( + "SORT_COLUMNS Either having duplicate columns : " + sortKeys[i]); + } + } + } + + for (int i = sortKeys.length - 1; i >= 0; i--) { + boolean isExists = false; + for (int j = fields.length - 1; j >= 0; j--) { + if (sortKeys[i].equalsIgnoreCase(fields[j].getFieldName())) { + sortKeys[i] = fields[j].getFieldName(); + isExists = true; + break; + } + } + if (!isExists) { + String message = "sort_columns: " + sortKeys[i] + + " does not exist in table. Please check create table statement."; + throw new MalformedCarbonCommandException(message); + } + } + sortColumnsList = Arrays.asList(sortKeys); + } else { + for (Field field : fields) { + if (null != field) { + if (field.getDataType() == DataTypes.STRING || field.getDataType() == DataTypes.DATE + || field.getDataType() == DataTypes.TIMESTAMP) { + sortColumnsList.add(field.getFieldName()); + } + } + } + } + return sortColumnsList; + } }
