Repository: carbondata Updated Branches: refs/heads/metadata 191dfb351 -> 4b69c9d8e
[CARBONDATA-1286] Change Query related RDD to use TableInfo This closes #1146 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b69c9d8 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b69c9d8 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b69c9d8 Branch: refs/heads/metadata Commit: 4b69c9d8e00da4053196250357e67add5088d639 Parents: 191dfb3 Author: jackylk <[email protected]> Authored: Sat Jul 8 17:34:44 2017 +0800 Committer: ravipesala <[email protected]> Committed: Tue Jul 11 15:06:38 2017 +0530 ---------------------------------------------------------------------- .../core/metadata/CarbonMetadata.java | 3 +- .../ThriftWrapperSchemaConverterImpl.java | 8 +- .../core/metadata/datatype/DataType.java | 44 +++++++- .../core/metadata/encoder/Encoding.java | 20 ++++ .../core/metadata/schema/SchemaEvolution.java | 1 + .../core/metadata/schema/table/CarbonTable.java | 105 ++++-------------- .../core/metadata/schema/table/TableInfo.java | 106 +++++++++++++++---- .../core/metadata/schema/table/TableSchema.java | 29 ++++- .../core/metadata/schema/table/Writable.java | 31 ++++++ .../metadata/schema/table/WritableUtil.java | 45 ++++++++ .../schema/table/column/ColumnSchema.java | 84 ++++++++++++--- .../dictionary/client/DictionaryClientTest.java | 3 +- ...ncrementalColumnDictionaryGeneratorTest.java | 3 +- .../ServerDictionaryGeneratorTest.java | 3 +- .../generator/TableDictionaryGeneratorTest.java | 3 +- .../core/metadata/CarbonMetadataTest.java | 6 +- .../ThriftWrapperSchemaConverterImplTest.java | 9 +- .../metadata/schema/table/CarbonTableTest.java | 3 +- .../table/CarbonTableWithComplexTypesTest.java | 3 +- .../carbondata/hadoop/CarbonInputFormat.java | 93 ++++++++-------- .../hadoop/ft/CarbonInputMapperTest.java | 2 +- .../hadoop/test/util/StoreCreator.java | 1 - .../hive/MapredCarbonInputFormat.java | 16 +++ .../testsuite/createTable/TestTableIdTest.scala | 76 ------------- .../apache/carbondata/spark/rdd/CarbonRDD.scala | 17 +++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 14 +-- .../spark/rdd/DataManagementFunc.scala | 1 - .../execution/command/carbonTableSchema.scala | 1 - .../scala/org/apache/spark/sql/CarbonScan.scala | 4 +- .../execution/command/carbonTableSchema.scala | 2 - .../sql/CarbonDatasourceHadoopRelation.scala | 42 +++++--- .../spark/sql/CarbonDictionaryDecoder.scala | 15 ++- .../execution/CarbonLateDecodeStrategy.scala | 19 ++-- .../sql/execution/command/IUDCommands.scala | 8 +- .../execution/command/carbonTableSchema.scala | 2 - .../processing/model/CarbonLoadModel.java | 7 -- .../carbondata/processing/StoreCreator.java | 1 - 37 files changed, 498 insertions(+), 332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java index 079540f..75fe78b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java @@ -78,8 +78,7 @@ public final class CarbonMetadata { CarbonTable carbonTable = tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName())); if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < tableInfo .getLastUpdatedTime()) { - carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(tableInfo); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), carbonTable); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 2d5f395..5f71582 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -269,12 +269,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { org.apache.carbondata.format.TableSchema thriftFactTable = fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable()); - List<org.apache.carbondata.format.TableSchema> thriftAggTables = - new ArrayList<org.apache.carbondata.format.TableSchema>(); - for (TableSchema wrapperAggTableSchema : wrapperTableInfo.getAggregateTableList()) { - thriftAggTables.add(fromWrapperToExternalTableSchema(wrapperAggTableSchema)); - } - return new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables); + return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache + .carbondata.format.TableSchema>()); } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java index 52e63dc..3a7f75d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java @@ -46,8 +46,8 @@ public enum DataType { // size of the value of this data type, negative value means variable length private int sizeInBytes; - DataType(int value ,String name, int sizeInBytes) { - this.precedenceOrder = value; + DataType(int precedenceOrder, String name, int sizeInBytes) { + this.precedenceOrder = precedenceOrder; this.name = name; this.sizeInBytes = sizeInBytes; } @@ -71,4 +71,44 @@ public enum DataType { public int getSizeBits() { return (int) (Math.log(getSizeInBytes()) / Math.log(2)); } + + public static DataType valueOf(int ordinal) { + if (ordinal == STRING.ordinal()) { + return STRING; + } else if (ordinal == DATE.ordinal()) { + return DATE; + } else if (ordinal == TIMESTAMP.ordinal()) { + return TIMESTAMP; + } else if (ordinal == BOOLEAN.ordinal()) { + return BOOLEAN; + } else if (ordinal == SHORT.ordinal()) { + return SHORT; + } else if (ordinal == INT.ordinal()) { + return INT; + } else if (ordinal == FLOAT.ordinal()) { + return FLOAT; + } else if (ordinal == LONG.ordinal()) { + return LONG; + } else if (ordinal == DOUBLE.ordinal()) { + return DOUBLE; + } else if (ordinal == NULL.ordinal()) { + return NULL; + } else if (ordinal == DECIMAL.ordinal()) { + return DECIMAL; + } else if (ordinal == ARRAY.ordinal()) { + return ARRAY; + } else if (ordinal == STRUCT.ordinal()) { + return STRUCT; + } else if (ordinal == MAP.ordinal()) { + return MAP; + } else if (ordinal == BYTE.ordinal()) { + return BYTE; + } else if (ordinal == BYTE_ARRAY.ordinal()) { + return BYTE_ARRAY; + } else if (ordinal == SHORT_INT.ordinal()) { + return SHORT_INT; + } else { + throw new RuntimeException("create DataType with invalid ordinal: " + ordinal); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java index fe24975..e3d7a9a 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java @@ -27,4 +27,24 @@ public enum Encoding { BIT_PACKED, DIRECT_DICTIONARY, IMPLICIT; + + public static Encoding valueOf(int ordinal) { + if (ordinal == DICTIONARY.ordinal()) { + return DICTIONARY; + } else if (ordinal == DELTA.ordinal()) { + return DELTA; + } else if (ordinal == RLE.ordinal()) { + return RLE; + } else if (ordinal == INVERTED_INDEX.ordinal()) { + return INVERTED_INDEX; + } else if (ordinal == BIT_PACKED.ordinal()) { + return BIT_PACKED; + } else if (ordinal == DIRECT_DICTIONARY.ordinal()) { + return DIRECT_DICTIONARY; + } else if (ordinal == IMPLICIT.ordinal()) { + return IMPLICIT; + } else { + throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java index 4bdbe8d..6960736 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java @@ -47,4 +47,5 @@ public class SchemaEvolution implements Serializable { public void setSchemaEvolutionEntryList(List<SchemaEvolutionEntry> schemaEvolutionEntryList) { this.schemaEvolutionEntryList = schemaEvolutionEntryList; } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 16ded57..01b3022 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -20,8 +20,6 @@ package org.apache.carbondata.core.metadata.schema.table; import java.io.Serializable; import java.util.*; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; @@ -33,7 +31,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.stats.PartitionStatistic; /** * Mapping class for Carbon actual table @@ -41,15 +38,14 @@ import org.apache.carbondata.core.stats.PartitionStatistic; public class CarbonTable implements Serializable { /** - * serialization id + * the cached table info */ - private static final long serialVersionUID = 8696507171227156445L; + private TableInfo tableInfo; /** - * Attribute for Carbon table LOGGER + * serialization id */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonTable.class.getName()); + private static final long serialVersionUID = 8696507171227156445L; /** * Absolute table identifier @@ -99,20 +95,11 @@ public class CarbonTable implements Serializable { private Map<String, PartitionInfo> tablePartitionMap; /** - * statistic information of partition table - */ - private PartitionStatistic partitionStatistic; - /** * tableUniqueName */ private String tableUniqueName; /** - * Aggregate tables name - */ - private List<String> aggregateTablesName; - - /** * metadata file path (check if it is really required ) */ private String metaDataFilepath; @@ -137,13 +124,12 @@ public class CarbonTable implements Serializable { */ private int numberOfNoDictSortColumns; - public CarbonTable() { + private CarbonTable() { this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>(); this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>(); this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>(); this.tableBucketMap = new HashMap<>(); this.tablePartitionMap = new HashMap<>(); - this.aggregateTablesName = new ArrayList<String>(); this.createOrderColumn = new HashMap<String, List<CarbonColumn>>(); this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>(); } @@ -151,39 +137,26 @@ public class CarbonTable implements Serializable { /** * @param tableInfo */ - public void loadCarbonTable(TableInfo tableInfo) { - this.blockSize = getTableBlockSizeInMB(tableInfo); - this.tableLastUpdatedTime = tableInfo.getLastUpdatedTime(); - this.tableUniqueName = tableInfo.getTableUniqueName(); - this.metaDataFilepath = tableInfo.getMetaDataFilepath(); - //setting unique table identifier - CarbonTableIdentifier carbontableIdentifier = - new CarbonTableIdentifier(tableInfo.getDatabaseName(), - tableInfo.getFactTable().getTableName(), tableInfo.getFactTable().getTableId()); - this.absoluteTableIdentifier = - new AbsoluteTableIdentifier(tableInfo.getStorePath(), carbontableIdentifier); - - fillDimensionsAndMeasuresForTables(tableInfo.getFactTable()); - fillCreateOrderColumn(tableInfo.getFactTable().getTableName()); - List<TableSchema> aggregateTableList = tableInfo.getAggregateTableList(); - for (TableSchema aggTable : aggregateTableList) { - this.aggregateTablesName.add(aggTable.getTableName()); - fillDimensionsAndMeasuresForTables(aggTable); - if (aggTable.getBucketingInfo() != null) { - tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo()); - } - if (aggTable.getPartitionInfo() != null) { - tablePartitionMap.put(aggTable.getTableName(), aggTable.getPartitionInfo()); - } - } + public static CarbonTable buildFromTableInfo(TableInfo tableInfo) { + CarbonTable table = new CarbonTable(); + table.tableInfo = tableInfo; + table.blockSize = tableInfo.getTableBlockSizeInMB(); + table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime(); + table.tableUniqueName = tableInfo.getTableUniqueName(); + table.metaDataFilepath = tableInfo.getMetaDataFilepath(); + table.absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); + + table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable()); + table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName()); if (tableInfo.getFactTable().getBucketingInfo() != null) { - tableBucketMap.put(tableInfo.getFactTable().getTableName(), + table.tableBucketMap.put(tableInfo.getFactTable().getTableName(), tableInfo.getFactTable().getBucketingInfo()); } if (tableInfo.getFactTable().getPartitionInfo() != null) { - tablePartitionMap.put(tableInfo.getFactTable().getTableName(), + table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(), tableInfo.getFactTable().getPartitionInfo()); } + return table; } /** @@ -213,29 +186,6 @@ public class CarbonTable implements Serializable { this.createOrderColumn.put(tableName, columns); } - /** - * This method will return the table size. Default table block size will be considered - * in case not specified by the user - * - * @param tableInfo - * @return - */ - private int getTableBlockSizeInMB(TableInfo tableInfo) { - String tableBlockSize = null; - // In case of old store there will not be any map for table properties so table properties - // will be null - Map<String, String> tableProperties = tableInfo.getFactTable().getTableProperties(); - if (null != tableProperties) { - tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE); - } - if (null == tableBlockSize) { - tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL; - LOGGER.info("Table block size not specified for " + tableInfo.getTableUniqueName() - + ". Therefore considering the default value " - + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB"); - } - return Integer.parseInt(tableBlockSize); - } /** * Fill allDimensions and allMeasures for carbon table @@ -415,13 +365,6 @@ public class CarbonTable implements Serializable { } /** - * @return list of aggregate TablesName - */ - public List<String> getAggregateTablesName() { - return aggregateTablesName; - } - - /** * @return the tableLastUpdatedTime */ public long getTableLastUpdatedTime() { @@ -597,10 +540,6 @@ public class CarbonTable implements Serializable { return null != tablePartitionMap.get(getFactTableName()); } - public PartitionStatistic getPartitionStatistic() { - return partitionStatistic; - } - /** * @return absolute table identifier */ @@ -708,8 +647,6 @@ public class CarbonTable implements Serializable { return sort_columsList; } - - public int getNumberOfSortColumns() { return numberOfSortColumns; } @@ -717,4 +654,8 @@ public class CarbonTable implements Serializable { public int getNumberOfNoDictSortColumns() { return numberOfNoDictSortColumns; } + + public TableInfo getTableInfo() { + return tableInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index 328928d..78cd97b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -16,17 +16,30 @@ */ package org.apache.carbondata.core.metadata.schema.table; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; +import java.util.Map; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; /** * Store the information about the table. * it stores the fact table as well as aggregate table present in the schema */ -public class TableInfo implements Serializable { +public class TableInfo implements Serializable, Writable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(TableInfo.class.getName()); /** * serialization version @@ -49,11 +62,6 @@ public class TableInfo implements Serializable { private TableSchema factTable; /** - * list of aggregate table - */ - private List<TableSchema> aggregateTableList; - - /** * last updated time to update the table if any changes */ private long lastUpdatedTime; @@ -68,8 +76,10 @@ public class TableInfo implements Serializable { */ private String storePath; + // this idenifier is a lazy field which will be created when it is used first time + private AbsoluteTableIdentifier identifier; + public TableInfo() { - aggregateTableList = new ArrayList<TableSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); } /** @@ -87,20 +97,6 @@ public class TableInfo implements Serializable { } /** - * @return the aggregateTableList - */ - public List<TableSchema> getAggregateTableList() { - return aggregateTableList; - } - - /** - * @param aggregateTableList the aggregateTableList to set - */ - public void setAggregateTableList(List<TableSchema> aggregateTableList) { - this.aggregateTableList = aggregateTableList; - } - - /** * @return the databaseName */ public String getDatabaseName() { @@ -206,4 +202,68 @@ public class TableInfo implements Serializable { } return true; } + + /** + * This method will return the table size. Default table block size will be considered + * in case not specified by the user + */ + int getTableBlockSizeInMB() { + String tableBlockSize = null; + // In case of old store there will not be any map for table properties so table properties + // will be null + Map<String, String> tableProperties = getFactTable().getTableProperties(); + if (null != tableProperties) { + tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE); + } + if (null == tableBlockSize) { + tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL; + LOGGER.info("Table block size not specified for " + getTableUniqueName() + + ". Therefore considering the default value " + + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB"); + } + return Integer.parseInt(tableBlockSize); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(databaseName); + out.writeUTF(tableUniqueName); + factTable.write(out); + out.writeLong(lastUpdatedTime); + out.writeUTF(metaDataFilepath); + out.writeUTF(storePath); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.databaseName = in.readUTF(); + this.tableUniqueName = in.readUTF(); + this.factTable = new TableSchema(); + this.factTable.readFields(in); + this.lastUpdatedTime = in.readLong(); + this.metaDataFilepath = in.readUTF(); + this.storePath = in.readUTF(); + } + + public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() { + if (identifier == null) { + CarbonTableIdentifier carbontableIdentifier = + new CarbonTableIdentifier(databaseName, factTable.getTableName(), factTable.getTableId()); + identifier = new AbsoluteTableIdentifier(storePath, carbontableIdentifier); + } + return identifier; + } + + public byte[] serialize() throws IOException { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + this.write(new DataOutputStream(bao)); + return bao.toByteArray(); + } + + public static TableInfo deserialize(byte[] bytes) throws IOException { + TableInfo tableInfo = new TableInfo(); + DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes)); + tableInfo.readFields(in); + return tableInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java index f9d848e..a396d19 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java @@ -16,6 +16,9 @@ */ package org.apache.carbondata.core.metadata.schema.table; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -30,7 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; /** * Persisting the table information */ -public class TableSchema implements Serializable { +public class TableSchema implements Serializable, Writable { /** * serialization version @@ -198,4 +201,28 @@ public class TableSchema implements Serializable { public void setPartitionInfo(PartitionInfo partitionInfo) { this.partitionInfo = partitionInfo; } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(tableId); + out.writeUTF(tableName); + out.writeInt(listOfColumns.size()); + for (ColumnSchema column : listOfColumns) { + column.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.tableId = in.readUTF(); + this.tableName = in.readUTF(); + int listSize = in.readInt(); + this.listOfColumns = new ArrayList<>(listSize); + for (int i = 0; i < listSize; i++) { + ColumnSchema schema = new ColumnSchema(); + schema.readFields(in); + this.listOfColumns.add(schema); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java new file mode 100644 index 0000000..94a3b78 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +// The same interface as hadoop.io.Writable. We port the interface here to avoid hadoop package +// dependency +public interface Writable { + + void write(DataOutput out) throws IOException; + + void readFields(DataInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java new file mode 100644 index 0000000..dbcb1a7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class WritableUtil { + + public static void writeByteArray(DataOutput out, byte[] bytes) throws IOException { + if (bytes == null) { + out.writeInt(-1); + } else { + out.writeInt(bytes.length); + out.write(bytes); + } + } + + public static byte[] readByteArray(DataInput in) throws IOException { + int length = in.readInt(); + if (length == -1) { + return null; + } else { + byte[] b = new byte[length]; + in.readFully(b); + return b; + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java index f5b8116..3680d53 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java @@ -16,17 +16,24 @@ */ package org.apache.carbondata.core.metadata.schema.table.column; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.Writable; +import org.apache.carbondata.core.metadata.schema.table.WritableUtil; /** * Store the information about the column meta data present the table */ -public class ColumnSchema implements Serializable { +public class ColumnSchema implements Serializable, Writable { /** * serialization version @@ -37,6 +44,7 @@ public class ColumnSchema implements Serializable { * dataType */ private DataType dataType; + /** * Name of the column. If it is a complex data type, we follow a naming rule * grand_parent_column.parent_column.child_column @@ -360,17 +368,6 @@ public class ColumnSchema implements Serializable { } /** - * @param property - * @return - */ - public String getColumnProperty(String property) { - if (null != columnProperties) { - return columnProperties.get(property); - } - return null; - } - - /** * return columnproperties */ public Map<String, String> getColumnProperties() { @@ -421,4 +418,67 @@ public class ColumnSchema implements Serializable { public void setSortColumn(boolean sortColumn) { isSortColumn = sortColumn; } + + @Override + public void write(DataOutput out) throws IOException { + out.writeShort(dataType.ordinal()); + out.writeUTF(columnName); + out.writeUTF(columnUniqueId); + out.writeUTF(columnReferenceId); + if (encodingList == null) { + out.writeShort(0); + } else { + out.writeShort(encodingList.size()); + for (Encoding encoding : encodingList) { + out.writeShort(encoding.ordinal()); + } + } + out.writeBoolean(isDimensionColumn); + out.writeInt(scale); + out.writeInt(precision); + out.writeInt(schemaOrdinal); + out.writeInt(numberOfChild); + WritableUtil.writeByteArray(out, defaultValue); + if (columnProperties == null) { + out.writeShort(0); + } else { + out.writeShort(columnProperties.size()); + for (Map.Entry<String, String> entry : columnProperties.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + out.writeBoolean(invisible); + out.writeBoolean(isSortColumn); + } + + @Override + public void readFields(DataInput in) throws IOException { + int ordinal = in.readShort(); + this.dataType = DataType.valueOf(ordinal); + this.columnName = in.readUTF(); + this.columnUniqueId = in.readUTF(); + this.columnReferenceId = in.readUTF(); + int encodingListSize = in.readShort(); + this.encodingList = new ArrayList<>(encodingListSize); + for (int i = 0; i < encodingListSize; i++) { + ordinal = in.readShort(); + encodingList.add(Encoding.valueOf(ordinal)); + } + this.isDimensionColumn = in.readBoolean(); + this.scale = in.readInt(); + this.precision = in.readInt(); + this.schemaOrdinal = in.readInt(); + this.numberOfChild = in.readInt(); + this.defaultValue = WritableUtil.readByteArray(in); + int mapSize = in.readShort(); + this.columnProperties = new HashMap<>(mapSize); + for (int i = 0; i < mapSize; i++) { + String key = in.readUTF(); + String value = in.readUTF(); + this.columnProperties.put(key, value); + } + this.invisible = in.readBoolean(); + this.isSortColumn = in.readBoolean(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java index 60d3c26..dc3b232 100644 --- a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java +++ b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java @@ -88,8 +88,7 @@ public class DictionaryClientTest { tableInfo.setDatabaseName("test"); storePath = System.getProperty("java.io.tmpdir") + "/tmp"; tableInfo.setStorePath(storePath); - CarbonTable carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(tableInfo); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); // Add the created table to metadata metadata.addCarbonTable(carbonTable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java index 49db6ce..0bac01a 100644 --- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java +++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java @@ -166,8 +166,7 @@ public class IncrementalColumnDictionaryGeneratorTest { System.out.print(dictPath.mkdirs()); tableInfo.setStorePath(storePath); - CarbonTable carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(tableInfo); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); // Add the table to metadata metadata.addCarbonTable(carbonTable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java index b66331f..d8df99a 100644 --- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java +++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java @@ -83,8 +83,7 @@ public class ServerDictionaryGeneratorTest { tableInfo.setDatabaseName("test"); storePath = System.getProperty("java.io.tmpdir") + "/tmp"; tableInfo.setStorePath(storePath); - CarbonTable carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(tableInfo); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); // Add the created table to metadata metadata.addCarbonTable(carbonTable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java index 0cb47c4..8a68b72 100644 --- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java +++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java @@ -83,8 +83,7 @@ public class TableDictionaryGeneratorTest { tableInfo.setDatabaseName("test"); storePath = System.getProperty("java.io.tmpdir") + "/tmp"; tableInfo.setStorePath(storePath); - CarbonTable carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(tableInfo); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); // Add the created table to metadata metadata.addCarbonTable(carbonTable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java index 56d14c2..3af0bdb 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java @@ -160,7 +160,7 @@ public class CarbonMetadataTest { } @Test public void testGetCarbonDimensionBasedOnColIdentifier() { - CarbonTable carbonTable = new CarbonTable(); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L)); String columnIdentifier = "1"; final List<CarbonDimension> carbonDimensions = new ArrayList(); ColumnSchema colSchema1 = new ColumnSchema(); @@ -186,7 +186,7 @@ public class CarbonMetadataTest { @Test public void testGetCarbonDimensionBasedOnColIdentifierWhenChildDimensionColumnEqualsColumnIdentifier() { - CarbonTable carbonTable = new CarbonTable(); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L)); String columnIdentifier = "9"; final List<CarbonDimension> carbonDimensions = new ArrayList(); ColumnSchema colSchema1 = new ColumnSchema(); @@ -226,7 +226,7 @@ public class CarbonMetadataTest { } @Test public void testGetCarbonDimensionBasedOnColIdentifierNullCase() { - CarbonTable carbonTable = new CarbonTable(); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L)); String columnIdentifier = "3"; final List<CarbonDimension> carbonDimensions = new ArrayList(); ColumnSchema colSchema1 = new ColumnSchema(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java index e8851e0..3961d9c 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java @@ -1500,10 +1500,6 @@ public class ThriftWrapperSchemaConverterImplTest { @Mock public TableSchema getFactTable() { return wrapperTableSchema; } - - @Mock public List<TableSchema> getAggregateTableList() { - return tableSchemas; - } }; new MockUp<TableSchema>() { @@ -1529,12 +1525,11 @@ public class ThriftWrapperSchemaConverterImplTest { }; org.apache.carbondata.format.TableSchema thriftFactTable = new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol); - List<org.apache.carbondata.format.TableSchema> thriftAggTables = new ArrayList<>(); - thriftAggTables.add(thriftFactTable); org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName); org.apache.carbondata.format.TableInfo expectedResult = - new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables); + new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache + .carbondata.format.TableSchema>()); assertEquals(expectedResult, actualResult); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java index 9ed8b56..f5ffe51 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java @@ -36,8 +36,7 @@ public class CarbonTableTest extends TestCase { private CarbonTable carbonTable; @BeforeClass public void setUp() { - carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(getTableInfo(1000L)); + carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L)); } @AfterClass public void tearDown() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java index 9c3ab6a..69cab49 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java @@ -35,8 +35,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase { private CarbonTable carbonTable; @BeforeClass public void setUp() { - carbonTable = new CarbonTable(); - carbonTable.loadCarbonTable(getTableInfo(1000L)); + carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L)); } @AfterClass public void tearDown() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 1e69648..787f571 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.mutate.data.BlockMappingVO; import org.apache.carbondata.core.scan.expression.Expression; @@ -101,51 +102,50 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; - private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; + private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + // a cache for carbon table, it will be used in task side + private CarbonTable carbonTable; + /** - * It is optional, if user does not set then it reads from store - * - * @param configuration - * @param carbonTable - * @throws IOException + * Set the `tableInfo` in `configuration` */ - public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + public static void setTableInfo(Configuration configuration, TableInfo tableInfo) throws IOException { - if (null != carbonTable) { - configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); + if (null != tableInfo) { + configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo)); } } - public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { - String carbonTableStr = configuration.get(CARBON_TABLE); - if (carbonTableStr == null) { - populateCarbonTable(configuration); - // read it from schema file in the store - carbonTableStr = configuration.get(CARBON_TABLE); - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); - } - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + /** + * Get TableInfo object from `configuration` + */ + private TableInfo getTableInfo(Configuration configuration) throws IOException { + String tableInfoStr = configuration.get(TABLE_INFO); + return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr); } /** - * this method will read the schema from the physical file and populate into CARBON_TABLE - * @param configuration - * @throws IOException + * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - private static void populateCarbonTable(Configuration configuration) throws IOException { - String dirs = configuration.get(INPUT_DIR, ""); - String[] inputPaths = StringUtils.split(dirs); - if (inputPaths.length == 0) { - throw new InvalidPathException("No input paths specified in job"); + private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + if (carbonTable == null) { + // carbon table should be created either from deserialized table info (schema saved in + // hive metastore) or by reading schema in HDFS (schema saved in HDFS) + TableInfo tableInfo = getTableInfo(configuration); + CarbonTable carbonTable; + if (tableInfo != null) { + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } else { + carbonTable = SchemaReader.readCarbonTableFromStore( + getAbsoluteTableIdentifier(configuration)); + } + this.carbonTable = carbonTable; + return carbonTable; + } else { + return this.carbonTable; } - AbsoluteTableIdentifier absoluteTableIdentifier = - AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); - // read the schema file to get the absoluteTableIdentifier having the correct table id - // persisted in the schema - CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); - setCarbonTable(configuration, carbonTable); } public static void setTablePath(Configuration configuration, String tablePath) @@ -212,13 +212,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * Set list of files to access */ public static void setFilesToAccess(Configuration configuration, List<String> validFiles) { - configuration - .set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); + configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); } - private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException { - return getCarbonTable(configuration).getAbsoluteTableIdentifier(); + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } + return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); } /** @@ -265,12 +269,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); - CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); - // this will be null in case of corrupt schema file. - if (null == carbonTable) { - throw new IOException("Missing/Corrupt schema file for table."); - } - + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); // prune partitions for filter query on partition table @@ -343,17 +342,15 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { UpdateVO invalidBlockVOForSegmentId = null; Boolean isIUDTable = false; - AbsoluteTableIdentifier absoluteTableIdentifier = - getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); //for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : getSegmentsToAccess(job)) { List<DataRefNode> dataRefNodes = - getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, + getDataBlocksOfSegment(job, filterExpressionProcessor, identifier, filterResolver, matchedPartitions, segmentNo, cacheClient, updateStatusManager); // Get the UpdateVO for those tables on which IUD operations being performed. @@ -699,7 +696,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); - CarbonTable carbonTable = getCarbonTable(configuration); + CarbonTable carbonTable = getOrCreateCarbonTable(configuration); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java index 9aa1188..86a3326 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java @@ -74,8 +74,8 @@ public class CarbonInputMapperTest extends TestCase { Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath)); } catch (Exception e) { - Assert.assertTrue("failed", false); e.printStackTrace(); + Assert.assertTrue("failed", false); throw e; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 88c182e..8cad313 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -270,7 +270,6 @@ public class StoreCreator { + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); - tableInfo.setAggregateTableList(new ArrayList<TableSchema>()); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 7a1c9db..202c444 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -31,8 +31,10 @@ import org.apache.carbondata.hadoop.CarbonInputFormat; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.io.ArrayWritable; @@ -42,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination { @@ -68,6 +71,19 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf); } + private CarbonTable getCarbonTable(Configuration configuration) throws IOException { + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); + // read the schema file to get the absoluteTableIdentifier having the correct table id + // persisted in the schema + return SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); + } + private QueryModel getQueryModel(Configuration configuration) throws IOException { CarbonTable carbonTable = getCarbonTable(configuration); // getting the table absoluteTableIdentifier from the carbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala deleted file mode 100644 index 3ccd2e3..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.createTable - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.sql.common.util.QueryTest -import org.junit.Assert -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.hadoop.CarbonInputFormat - -/** - * test functionality related the case change for database name - */ -class TestTableIdTest extends QueryTest with BeforeAndAfterAll { - - override def beforeAll: Unit = { - sql("drop table if exists carbontable") - } - - def validateTableId: Unit = { - val carbonInputFormat: CarbonInputFormat[Array[Object]] = new CarbonInputFormat[Array[Object]] - val jobConf: JobConf = new JobConf(new Configuration) - val job: Job = Job.getInstance(jobConf) - val storePath: String = storeLocation.replaceAll("\\\\", "/") - job.getConfiguration - .set("mapreduce.input.fileinputformat.inputdir", - storePath + "/default/carbontable") - val carbonTable: CarbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration) - val getAbsoluteTableIdentifier = classOf[CarbonInputFormat[Array[Object]]] - .getDeclaredMethod("getAbsoluteTableIdentifier", classOf[Configuration]) - getAbsoluteTableIdentifier.setAccessible(true) - val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteTableIdentifier - .invoke(carbonInputFormat, job.getConfiguration).asInstanceOf[AbsoluteTableIdentifier] - - Assert - .assertEquals(carbonTable.getCarbonTableIdentifier.getTableId, - absoluteTableIdentifier.getCarbonTableIdentifier.getTableId) - } - - test("test create table with database case name change") { - - try { - // table creation should be successful - sql("create table carbontable(a int, b string)stored by 'carbondata'") - assert(true) - } catch { - case ex: Exception => - assert(false) - } - validateTableId - } - - override def afterAll { - sql("drop table if exists carbontable") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 106a9fd..05b3989 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -17,11 +17,14 @@ package org.apache.carbondata.spark.rdd +import java.io.{ByteArrayInputStream, DataInputStream} + import scala.reflect.ClassTag import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo} /** @@ -44,3 +47,17 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, internalCompute(split, context) } } + +/** + * This RDD contains TableInfo object which is serialized and deserialized in driver and executor + */ +abstract class CarbonRDDWithTableInfo[T: ClassTag]( + @transient sc: SparkContext, + @transient private var deps: Seq[Dependency[_]], + serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) { + + def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) = + this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo) + + def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo) +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 3868342..85c4bc4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil @@ -34,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} @@ -53,8 +52,9 @@ class CarbonScanRDD( columnProjection: CarbonProjection, filterExpression: Expression, identifier: AbsoluteTableIdentifier, - @transient carbonTable: CarbonTable) - extends CarbonRDD[InternalRow](sc, Nil) { + serializedTableInfo: Array[Byte], + @transient tableInfo: TableInfo) + extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") private val jobTrackerId: String = { @@ -65,7 +65,7 @@ class CarbonScanRDD( private val readSupport = SparkReadSupport.readSupportClass - private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName) + private val bucketedTable = tableInfo.getFactTable.getBucketingInfo @transient private val jobId = new JobID(jobTrackerId, id) @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -174,6 +174,7 @@ class CarbonScanRDD( } override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", @@ -246,12 +247,13 @@ class CarbonScanRDD( } private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = { - CarbonInputFormat.setCarbonTable(conf, carbonTable) + CarbonInputFormat.setTableInfo(conf, tableInfo) createInputFormat(conf) } private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = { CarbonInputFormat.setCarbonReadSupport(conf, readSupport) + CarbonInputFormat.setTableInfo(conf, getTableInfo) createInputFormat(conf) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index 664cbae..954303f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -266,7 +266,6 @@ object DataManagementFunc { def prepareCarbonLoadModel(storePath: String, table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = { - newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) newCarbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index ee77f35..6174f7c 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -526,7 +526,6 @@ class TableNewProcessor(cm: TableModel) { tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) - tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]()) tableInfo } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala index 17d6065..3bd92e9 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala @@ -122,12 +122,14 @@ case class CarbonScan( columnProjection.foreach { attr => projection.addColumn(attr.name) } + new CarbonScanRDD( ocRaw.sparkContext, projection, buildCarbonPlan.getFilterExpression, carbonTable.getAbsoluteTableIdentifier, - carbonTable + carbonTable.getTableInfo.serialize(), + carbonTable.getTableInfo ) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 44d5efb..da6dd98 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -128,7 +128,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e val table = relation.tableMeta.carbonTable - carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) carbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation @@ -404,7 +403,6 @@ case class LoadTable( carbonLoadModel.setStorePath(relation.tableMeta.storePath) val table = relation.tableMeta.carbonTable - carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) carbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index d28044f..d1baf79 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.io.{ByteArrayOutputStream, DataOutputStream} + import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD @@ -28,15 +30,13 @@ import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.logical.AndExpression -import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo} import org.apache.carbondata.hadoop.CarbonProjection -import org.apache.carbondata.hadoop.util.SchemaReader -import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.CarbonFilters import org.apache.carbondata.spark.rdd.CarbonScanRDD -import org.apache.carbondata.spark.util.CarbonSparkUtil case class CarbonDatasourceHadoopRelation( sparkSession: SparkSession, @@ -46,15 +46,22 @@ case class CarbonDatasourceHadoopRelation( isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]()) extends BaseRelation with InsertableRelation { - lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) - lazy val carbonTable = carbonRelation.tableMeta.carbonTable - lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName), - absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession) + lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) + lazy val databaseName: String = carbonTable.getDatabaseName + lazy val tableName: String = carbonTable.getFactTableName + lazy val carbonSessionInfo : CarbonSessionInfo = + CarbonEnv.getInstance(sparkSession).carbonSessionInfo + ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) + + @transient lazy val carbonRelation: CarbonRelation = + CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation( + Some(identifier.getCarbonTableIdentifier.getDatabaseName), + identifier.getCarbonTableIdentifier.getTableName)(sparkSession) .asInstanceOf[CarbonRelation] - val carbonSessionInfo : CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) + @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable + override def sqlContext: SQLContext = sparkSession.sqlContext override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema) @@ -67,15 +74,20 @@ case class CarbonDatasourceHadoopRelation( val projection = new CarbonProjection requiredColumns.foreach(projection.addColumn) - new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull, - absIdentifier, carbonTable) + new CarbonScanRDD( + sqlContext.sparkContext, + projection, + filterExpression.orNull, + identifier, + carbonTable.getTableInfo.serialize(), + carbonTable.getTableInfo) } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0) override def toString: String = { - "CarbonDatasourceHadoopRelation [ " + "Database name :" + carbonTable.getDatabaseName + - ", " + "Table name :" + carbonTable.getFactTableName + ", Schema :" + tableSchema + " ]" + "CarbonDatasourceHadoopRelation [ " + "Database name :" + databaseName + + ", " + "Table name :" + tableName + ", Schema :" + tableSchema + " ]" } override def sizeInBytes: Long = carbonRelation.sizeInBytes http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index bd1c8b1..33091aa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.spark.CarbonAliasDecoderRelation -import org.apache.carbondata.spark.rdd.CarbonRDD +import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonRDDWithTableInfo} /** * It decodes the data. @@ -444,10 +444,9 @@ class CarbonDecoderRDD( aliasMap: CarbonAliasDecoderRelation, prev: RDD[InternalRow], output: Seq[Attribute], - sparkSession: SparkSession) - extends CarbonRDD[InternalRow](prev) { - - private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + storePath: String, + serializedTableInfo: Array[Byte]) + extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) { def canBeDecoded(attr: Attribute): Boolean = { profile match { @@ -516,13 +515,13 @@ class CarbonDecoderRDD( override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val absoluteTableIdentifiers = relations.map { relation => - val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable - (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) + val tableInfo = getTableInfo + (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier) }.toMap val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath) + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, forwardDictionaryCache) val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index a206bef..1cc6668 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.execution -import java.text.SimpleDateFormat -import java.util.Date - import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -34,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.{AtomicType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampDirectDictionaryGenerator import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.{CarbonAliasDecoderRelation} +import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -99,8 +95,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { relation.addAttribute(newAttr) newAttr } - new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs), - CarbonAliasDecoderRelation(), rdd, output, SparkSession.getActiveSession.get) + + new CarbonDecoderRDD( + Seq(relation), + IncludeProfile(attrs), + CarbonAliasDecoderRelation(), + rdd, + output, + CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath, + table.carbonTable.getTableInfo.serialize()) } private[this] def toCatalystRDD( http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index 0894f23..2fccd0c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -795,8 +795,8 @@ object UpdateExecution { def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = { - val tableName = relation.absIdentifier.getCarbonTableIdentifier.getTableName - val dbName = relation.absIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = relation.identifier.getCarbonTableIdentifier.getTableName + val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName (tableIdentifier.size > 1 && tableIdentifier(0) == dbName && tableIdentifier(1) == tableName) || @@ -841,8 +841,8 @@ object UpdateExecution { val header = getHeader(carbonRelation, plan) LoadTable( - Some(carbonRelation.absIdentifier.getCarbonTableIdentifier.getDatabaseName), - carbonRelation.absIdentifier.getCarbonTableIdentifier.getTableName, + Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName), + carbonRelation.identifier.getCarbonTableIdentifier.getTableName, null, Seq(), Map(("fileheader" -> header)), http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 4b22cea..ce66733 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -126,7 +126,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab val carbonLoadModel = new CarbonLoadModel() val table = relation.tableMeta.carbonTable - carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) carbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation @@ -495,7 +494,6 @@ case class LoadTable( carbonLoadModel.setStorePath(relation.tableMeta.storePath) val table = relation.tableMeta.carbonTable - carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) carbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index bfc1be9..b60085b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -488,13 +488,6 @@ public class CarbonLoadModel implements Serializable { } /** - * @param aggTables the aggTables to set - */ - public void setAggTables(String[] aggTables) { - this.aggTables = aggTables; - } - - /** * @param storePath The storePath to set. */ public void setStorePath(String storePath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b69c9d8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 91cc195..a7c2057 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -261,7 +261,6 @@ public class StoreCreator { + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); - tableInfo.setAggregateTableList(new ArrayList<TableSchema>()); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
