Rebase datamap branch onto master This closes #1196
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/79feac96 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/79feac96 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/79feac96 Branch: refs/heads/master Commit: 79feac96ae789851c5ad7306a7acaaba25d8e6c9 Parents: b681244 Author: Raghunandan S <[email protected]> Authored: Thu Jul 27 20:38:48 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Jul 27 20:41:22 2017 +0800 ---------------------------------------------------------------------- .../core/indexstore/UnsafeMemoryDMStore.java | 31 +- .../blockletindex/BlockletDataMap.java | 11 +- .../core/indexstore/row/DataMapRow.java | 4 +- .../core/indexstore/row/DataMapRowImpl.java | 4 + .../core/indexstore/row/UnsafeDataMapRow.java | 40 +- .../core/memory/UnsafeMemoryManager.java | 19 +- .../datatype/DecimalConverterFactory.java | 2 +- .../hadoop/api/CarbonTableInputFormat.java | 83 ++-- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +- .../execution/command/carbonTableSchema.scala | 6 +- .../spark/sql/hive/CarbonFileMetastore.scala | 392 +++++++++---------- 11 files changed, 301 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 8246f99..13951dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -19,9 +19,10 @@ package org.apache.carbondata.core.indexstore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; import org.apache.carbondata.core.indexstore.schema.DataMapSchema; -import org.apache.carbondata.core.memory.MemoryAllocator; -import org.apache.carbondata.core.memory.MemoryAllocatorFactory; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe; @@ -39,8 +40,6 @@ public class UnsafeMemoryDMStore { private int runningLength; - private MemoryAllocator memoryAllocator; - private boolean isMemoryFreed; private DataMapSchema[] schema; @@ -49,11 +48,13 @@ public class UnsafeMemoryDMStore { private int rowCount; - public UnsafeMemoryDMStore(DataMapSchema[] schema) { + private final long taskId = null != ThreadLocalTaskInfo.getCarbonTaskInfo() ? + ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId() : System.nanoTime(); + + public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException { this.schema = schema; - this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator(); this.allocatedSize = capacity; - this.memoryBlock = memoryAllocator.allocate(allocatedSize); + this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize); this.pointers = new int[1000]; } @@ -63,13 +64,13 @@ public class UnsafeMemoryDMStore { * * @param rowSize */ - private void ensureSize(int rowSize) { + private void ensureSize(int rowSize) throws MemoryException { if (runningLength + rowSize >= allocatedSize) { MemoryBlock allocate = - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity); + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity); unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); allocatedSize = allocatedSize + capacity; memoryBlock = allocate; } @@ -86,7 +87,7 @@ public class UnsafeMemoryDMStore { * @param indexRow * @return */ - public void addIndexRowToUnsafe(DataMapRow indexRow) { + public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException { // First calculate the required memory to keep the row in unsafe int rowSize = indexRow.getTotalSizeInBytes(); // Check whether allocated memory is sufficient or not. @@ -168,13 +169,13 @@ public class UnsafeMemoryDMStore { return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); } - public void finishWriting() { + public void finishWriting() throws MemoryException { if (runningLength < allocatedSize) { MemoryBlock allocate = - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength); + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength); unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); memoryBlock = allocate; } // Compact pointers. @@ -187,7 +188,7 @@ public class UnsafeMemoryDMStore { public void freeMemory() { if (!isMemoryFreed) { - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); isMemoryFreed = true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 79aa091..680852d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; import org.apache.carbondata.core.indexstore.schema.DataMapSchema; import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; @@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable { if (unsafeMemoryDMStore != null) { unsafeMemoryDMStore.finishWriting(); } - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable { DataOutput dataOutput = new DataOutputStream(stream); blockletInfo.write(dataOutput); serializedData = stream.toByteArray(); - } catch (IOException e) { + row.setByteArray(serializedData, ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { throw new RuntimeException(e); } - row.setByteArray(serializedData, ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); } } @@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable { return minRow; } - private void createSchema(SegmentProperties segmentProperties) { + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { List<DataMapSchema> indexSchemas = new ArrayList<>(); // Index key http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index defe766..631e0ad 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -62,6 +62,8 @@ public abstract class DataMapRow { public abstract double getDouble(int ordinal); + public abstract int getLengthInBytes(int ordinal); + public int getTotalSizeInBytes() { int len = 0; for (int i = 0; i < schemas.length; i++) { @@ -75,7 +77,7 @@ public abstract class DataMapRow { case FIXED: return schemas[ordinal].getLength(); case VARIABLE: - return getByteArray(ordinal).length + 2; + return getLengthInBytes(ordinal) + 2; case STRUCT: return getRow(ordinal).getTotalSizeInBytes(); default: http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java index adec346..32d15d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java @@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow { return (byte[]) data[ordinal]; } + @Override public int getLengthInBytes(int ordinal) { + return ((byte[]) data[ordinal]).length; + } + @Override public DataMapRow getRow(int ordinal) { return (DataMapRow) data[ordinal]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index ef78514..c398115 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow { return data; } + @Override public int getLengthInBytes(int ordinal) { + int length; + int position = getPosition(ordinal); + switch (schemas[ordinal].getSchemaType()) { + case VARIABLE: + length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position); + break; + default: + length = schemas[ordinal].getLength(); + } + return length; + } + + private int getLengthInBytes(int ordinal, int position) { + int length; + switch (schemas[ordinal].getSchemaType()) { + case VARIABLE: + length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position); + break; + default: + length = schemas[ordinal].getLength(); + } + return length; + } + @Override public DataMapRow getRow(int ordinal) { DataMapSchema[] childSchemas = ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas(); @@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow { throw new UnsupportedOperationException("Not supported to set on unsafe row"); } + private int getSizeInBytes(int ordinal, int position) { + switch (schemas[ordinal].getSchemaType()) { + case FIXED: + return schemas[ordinal].getLength(); + case VARIABLE: + return getLengthInBytes(ordinal, position) + 2; + case STRUCT: + return getRow(ordinal).getTotalSizeInBytes(); + default: + throw new UnsupportedOperationException("wrong type"); + } + } + private int getPosition(int ordinal) { int position = 0; for (int i = 0; i < ordinal; i++) { - position += getSizeInBytes(i); + position += getSizeInBytes(i, position); } return position; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 991bc90..d433b5e 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -90,33 +90,28 @@ public class UnsafeMemoryManager { if (memoryUsed + memoryRequested <= totalMemory) { MemoryBlock allocate = allocator.allocate(memoryRequested); memoryUsed += allocate.size(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Working Memory block (" + allocate + ") is created with size " + allocate.size() - + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) - + "Bytes"); - } Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); if (null == listOfMemoryBlock) { listOfMemoryBlock = new HashSet<>(); taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); } listOfMemoryBlock.add(allocate); + LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); return allocate; } return null; } - public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) { + public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); allocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + ( - totalMemory - memoryUsed)); - } + LOGGER.info( + "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory + - memoryUsed)); } public void freeMemoryAll(long taskId) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java index 555df1c..459eb24 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java @@ -29,7 +29,7 @@ import org.apache.carbondata.core.util.DataTypeUtil; */ public final class DecimalConverterFactory { - public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory(); + public static final DecimalConverterFactory INSTANCE = new DecimalConverterFactory(); private int[] minBytesForPrecision = minBytesForPrecision(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index e73c04a..9e6e284 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.UpdateVO; @@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; - private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; + private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + // a cache for carbon table, it will be used in task side + private CarbonTable carbonTable; + /** - * It is optional, if user does not set then it reads from store - * - * @param configuration - * @param carbonTable - * @throws IOException + * Set the `tableInfo` in `configuration` */ - public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + public static void setTableInfo(Configuration configuration, TableInfo tableInfo) throws IOException { - if (null != carbonTable) { - configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); + if (null != tableInfo) { + configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo)); } } - public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { - String carbonTableStr = configuration.get(CARBON_TABLE); - if (carbonTableStr == null) { - populateCarbonTable(configuration); - // read it from schema file in the store - carbonTableStr = configuration.get(CARBON_TABLE); - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); - } - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + /** + * Get TableInfo object from `configuration` + */ + private TableInfo getTableInfo(Configuration configuration) throws IOException { + String tableInfoStr = configuration.get(TABLE_INFO); + return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr); } /** - * this method will read the schema from the physical file and populate into CARBON_TABLE - * - * @param configuration - * @throws IOException + * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - private static void populateCarbonTable(Configuration configuration) throws IOException { - String dirs = configuration.get(INPUT_DIR, ""); - String[] inputPaths = StringUtils.split(dirs); - if (inputPaths.length == 0) { - throw new InvalidPathException("No input paths specified in job"); + private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + if (carbonTable == null) { + // carbon table should be created either from deserialized table info (schema saved in + // hive metastore) or by reading schema in HDFS (schema saved in HDFS) + TableInfo tableInfo = getTableInfo(configuration); + CarbonTable carbonTable; + if (tableInfo != null) { + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } else { + carbonTable = SchemaReader.readCarbonTableFromStore( + getAbsoluteTableIdentifier(configuration)); + } + this.carbonTable = carbonTable; + return carbonTable; + } else { + return this.carbonTable; } - AbsoluteTableIdentifier absoluteTableIdentifier = - AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); - // read the schema file to get the absoluteTableIdentifier having the correct table id - // persisted in the schema - CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); - setCarbonTable(configuration, carbonTable); } public static void setTablePath(Configuration configuration, String tablePath) throws IOException { configuration.set(FileInputFormat.INPUT_DIR, tablePath); } - /** * It sets unresolved filter expression. * @@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); } - private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException { - return getCarbonTable(configuration).getAbsoluteTableIdentifier(); + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } + return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); } /** @@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); - CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); // this will be null in case of corrupt schema file. if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); @@ -277,7 +280,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (null != partitionInfo) { Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo); matchedPartitions = new FilterExpressionProcessor() - .getFilteredPartitions(filter, partitionInfo, partitioner); + .getFilteredPartitions(filter, partitionInfo); if (matchedPartitions.cardinality() == 0) { // no partition is required return new ArrayList<InputSplit>(); @@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { Boolean isIUDTable = false; AbsoluteTableIdentifier absoluteTableIdentifier = - getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); + getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier); @@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); - CarbonTable carbonTable = getCarbonTable(configuration); + CarbonTable carbonTable = getOrCreateCarbonTable(configuration); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 1a8183c..add0578 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) - CarbonInputFormat.setTableInfo(job.getConfiguration, + CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) var updateDetails: UpdateVO = null // initialise query_id for job http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index f3baf58..d34b91d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -268,7 +268,7 @@ case class DeleteLoadsById( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. tableMeta.carbonTable CarbonStore.deleteLoadById( loadids, @@ -293,7 +293,7 @@ case class DeleteLoadsByLoadDate( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. tableMeta.carbonTable CarbonStore.deleteLoadByDate( loadDate, @@ -847,7 +847,7 @@ case class ShowLoads( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. tableMeta.carbonTable CarbonStore.showSegments( getDB.getDatabaseName(databaseNameOp, sparkSession), http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 549841b..c9eaf6d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema +import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore { +class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca System.nanoTime() + "" } - lazy val metadata = loadMetadata(storePath, nextQueryId) + val metadata = MetaData(new ArrayBuffer[TableMeta]()) /** @@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca override def createCarbonRelation(parameters: Map[String, String], absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { - lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName, - Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession) - .asInstanceOf[CarbonRelation] + val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absIdentifier.getCarbonTableIdentifier.getTableName + val tables = getTableFromMetadataCache(database, tableName) + tables match { + case Some(t) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(t.carbonTable), t) + case None => + readCarbonSchema(absIdentifier) match { + case Some(meta) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta) + case None => + throw new NoSuchTableException(database, tableName) + } + } } def lookupRelation(dbName: Option[String], tableName: String) @@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } - def lookupRelation(tableIdentifier: TableIdentifier) + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { - checkSchemasModifiedTimeAndReloadTables() val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase - ) - val tables = getTableFromMetadata(database, tableIdentifier.table, true) - tables match { - case Some(t) => - CarbonRelation(database, tableIdentifier.table, - CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head) - case None => - throw new NoSuchTableException(database, tableIdentifier.table) + sparkSession.catalog.currentDatabase) + val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => + carbonDatasourceHadoopRelation.carbonRelation + case LogicalRelation( + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + carbonDatasourceHadoopRelation.carbonRelation + case _ => throw new NoSuchTableException(database, tableIdentifier.table) } + relation } /** @@ -123,11 +138,10 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName * @return */ - def getTableFromMetadata(database: String, - tableName: String, readStore: Boolean = false): Option[TableMeta] = { + def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = { metadata.tablesMeta .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) + c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) } def tableExists( @@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableExists(TableIdentifier(table, databaseOp))(sparkSession) } - def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) - tables.nonEmpty - } - - def loadMetadata(metadataPath: String, queryId: String): MetaData = { - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - // creating zookeeper instance once. - // if zookeeper is configured as carbon lock type. - val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) - if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) - } - if (metadataPath == null) { - return null - } - // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && - FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, - CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS - ) - LOGGER.info("Default lock type HDFSLOCK is configured") + override def tableExists(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + try { + lookupRelation(tableIdentifier)(sparkSession) + } catch { + case e: Exception => + return false } - val fileType = FileFactory.getFileType(metadataPath) - val metaDataBuffer = new ArrayBuffer[TableMeta] - fillMetaData(metadataPath, fileType, metaDataBuffer) - updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) - statistic.addStatistics(QueryStatisticsConstants.LOAD_META, - System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - MetaData(metaDataBuffer) + true } - private def fillMetaData(basePath: String, fileType: FileType, - metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { - val databasePath = basePath // + "/schemas" - try { - if (FileFactory.isFileExist(databasePath, fileType)) { - val file = FileFactory.getCarbonFile(databasePath, fileType) - val databaseFolders = file.listFiles() - - databaseFolders.foreach(databaseFolder => { - if (databaseFolder.isDirectory) { - val dbName = databaseFolder.getName - val tableFolders = databaseFolder.listFiles() - - tableFolders.foreach(tableFolder => { - if (tableFolder.isDirectory) { - val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, - tableFolder.getName, UUID.randomUUID().toString) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, - carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableName = tableFolder.getName - val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) - } - } - }) - } - }) - } else { - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } catch { - case s: java.io.FileNotFoundException => - s.printStackTrace() - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) + private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = { + val dbName = identifier.getCarbonTableIdentifier.getDatabaseName + val tableName = identifier.getCarbonTableIdentifier.getTableName + val storePath = identifier.getStorePath + val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), + tableName.toLowerCase(), UUID.randomUUID().toString) + val carbonTablePath = + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableUniqueName = dbName + "_" + tableName + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + val schemaFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier, + identifier.getStorePath, + identifier.getTablePath, + carbonTable) + metadata.tablesMeta += tableMeta + Some(tableMeta) + } else { + None } } @@ -238,28 +201,36 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String) (sparkSession: SparkSession): String = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, - newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName, - carbonStorePath) - createSchemaThriftFile(wrapperTableInfo, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + absoluteTableIdentifier.getStorePath) + val identifier = + new CarbonTableIdentifier(newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + wrapperTableInfo.getFactTable.getTableId) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName)(sparkSession) + identifier) + addTableCache(wrapperTableInfo, + AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName)) + path } /** @@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param carbonTableIdentifier * @param thriftTableInfo - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String)(sparkSession: SparkSession): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, - carbonStorePath) + tableIdentifier.getStorePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - createSchemaThriftFile(wrapperTableInfo, + wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName)(sparkSession) + tableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, tableIdentifier) + path } @@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * Load CarbonTable from wrapper tableInfo * */ - def createTableFromThrift( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = { - if (tableExists(tableName, Some(dbName))(sparkSession)) { - sys.error(s"Table [$tableName] already exists under Database [$dbName]") - } - val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) + def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val dbName = tableInfo.getDatabaseName + val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history - .add(schemaEvolutionEntry) - val carbonTablePath = createSchemaThriftFile(tableInfo, + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + tableInfo.setStorePath(identifier.getStorePath) + createSchemaThriftFile(tableInfo, thriftTableInfo, - dbName, - tableName)(sparkSession) + identifier.getCarbonTableIdentifier) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - (carbonTablePath, "") + } + + /** + * Generates schema string from TableInfo + */ + override def generateTableSchemaString(tableInfo: schema.table.TableInfo, + tablePath: String): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + addTableCache(tableInfo, tableIdentifier) + CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } /** @@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param tableInfo * @param thriftTableInfo - * @param dbName - * @param tableName - * @param sparkSession * @return */ - private def createSchemaThriftFile( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - dbName: String, tableName: String) - (sparkSession: SparkSession): String = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + private def createSchemaThriftFile(tableInfo: schema.table.TableInfo, + thriftTableInfo: TableInfo, + carbonTableIdentifier: CarbonTableIdentifier): String = { + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { FileFactory.mkdirs(schemaMetadataPath, fileType) @@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath)) + carbonTablePath.getPath + } + + protected def addTableCache(tableInfo: table.TableInfo, + absoluteTableIdentifier: AbsoluteTableIdentifier) = { + val identifier = absoluteTableIdentifier.getCarbonTableIdentifier + CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) + removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, - CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) + val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath, + absoluteTableIdentifier.getTablePath, + CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)) metadata.tablesMeta += tableMeta - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - carbonTablePath.getPath } /** @@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName */ def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName) metadataToBeRemoved match { case Some(tableMeta) => metadata.tablesMeta -= tableMeta CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) case None => - LOGGER.debug(s"No entry for table $tableName in database $dbName") + if (LOGGER.isDebugEnabled) { + LOGGER.debug(s"No entry for table $tableName in database $dbName") + } } } @@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tableName = tableIdentifier.table.toLowerCase - - val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, - new CarbonTableIdentifier(dbName, tableName, "")).getPath - - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) + try { + val tablePath = lookupRelation(tableIdentifier)(sparkSession). + asInstanceOf[CarbonRelation].tableMeta.tablePath + val fileType = FileFactory.getFileType(tablePath) + FileFactory.isFileExist(tablePath, fileType) + } catch { + case e: Exception => + false + } } - def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + def dropTable(tablePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession) { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca if (FileFactory.isFileExist(metadataFilePath, fileType)) { // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables - - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, - tableIdentifier.table) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - case None => - LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") - } + checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + + removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath)) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } } - private def getTimestampFileAndType(databaseName: String, tableName: String) = { - val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE + private def getTimestampFileAndType(basePath: String) = { + val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE val timestampFileType = FileFactory.getFileType(timestampFile) (timestampFile, timestampFileType) } @@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableModifiedTimeStore.put("default", timeStamp) } - def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) { - updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName)) + def updateAndTouchSchemasUpdatedTime(basePath: String) { + updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath)) } - /** - * This method will read the timestamp of empty schema file - * - * @param databaseName - * @param tableName - * @return - */ - private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - } else { - System.currentTimeMillis() - } - } /** * This method will check and create an empty schema timestamp file * - * @param databaseName - * @param tableName * @return */ - private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + private def touchSchemaFileSystemTime(basePath: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath) if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") + LOGGER.audit(s"Creating timestamp file for $basePath") FileFactory.createNewFile(timestampFile, timestampFileType) } val systemTime = System.currentTimeMillis() @@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca systemTime } - def checkSchemasModifiedTimeAndReloadTables() { - val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") + def checkSchemasModifiedTimeAndReloadTables(storePath: String) { + val (timestampFile, timestampFileType) = + getTimestampFileAndType(storePath) if (FileFactory.isFileExist(timestampFile, timestampFileType)) { if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). getLastModifiedTime == @@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca } private def refreshCache() { - metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta + metadata.tablesMeta.clear() } override def isReadFromHiveMetaStore: Boolean = false @@ -527,4 +492,3 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca CarbonUtil.readSchemaFile(tableMetadataFile) } } -
