This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 56879f747d150f4efae0f2998a38f4297706bc5e Author: kunal642 <kunalkapoor...@gmail.com> AuthorDate: Fri Jul 26 14:52:36 2019 +0530 [CARBONDATA-3480] Fixed unnecessary refresh for table by removing modified mdt file This closes #3339 --- .../carbondata/core/datamap/DataMapFilter.java | 47 +++ .../core/datamap/DataMapStoreManager.java | 14 +- .../carbondata/core/metadata/CarbonMetadata.java | 9 + .../core/metadata/schema/table/CarbonTable.java | 4 +- .../core/metadata/schema/table/TableSchema.java | 4 + .../statusmanager/SegmentUpdateStatusManager.java | 26 -- .../apache/carbondata/core/util/CarbonUtil.java | 1 - .../core/metadata/CarbonMetadataTest.java | 7 +- .../ThriftWrapperSchemaConverterImplTest.java | 4 +- .../metadata/schema/table/CarbonTableTest.java | 8 +- .../table/CarbonTableWithComplexTypesTest.java | 6 +- .../dblocation/DBLocationCarbonTableTestCase.scala | 25 -- .../apache/spark/sql/hive/CarbonSessionUtil.scala | 6 +- .../carbondata/indexserver/IndexServer.scala | 10 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 51 ++- .../command/datamap/CarbonDropDataMapCommand.scala | 1 - .../management/RefreshCarbonTableCommand.scala | 2 - .../CarbonAlterTableDropPartitionCommand.scala | 12 +- .../CarbonAlterTableSplitPartitionCommand.scala | 3 - .../command/preaaggregate/PreAggregateUtil.scala | 19 +- .../command/table/CarbonDropTableCommand.scala | 13 + .../spark/sql/hive/CarbonFileMetastore.scala | 425 +++++++++------------ .../spark/sql/hive/CarbonHiveMetaStore.scala | 10 +- .../apache/spark/sql/hive/CarbonMetaStore.scala | 10 +- .../scala/org/apache/spark/util/CleanFiles.scala | 3 - .../scala/org/apache/spark/util/Compaction.scala | 2 - .../apache/spark/util/DeleteSegmentByDate.scala | 2 - .../org/apache/spark/util/DeleteSegmentById.scala | 2 - .../scala/org/apache/spark/util/TableLoader.scala | 2 - .../apache/spark/sql/hive/CarbonSessionState.scala | 31 +- .../AlterTableColumnRenameTestCase.scala | 4 +- 31 files changed, 322 insertions(+), 441 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java index c20d0d5..ac4886d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java @@ -18,10 +18,15 @@ package org.apache.carbondata.core.datamap; import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; +import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -39,9 +44,51 @@ public class DataMapFilter implements Serializable { public DataMapFilter(CarbonTable table, Expression expression) { this.table = table; this.expression = expression; + if (expression != null) { + checkIfFilterColumnExistsInTable(); + } resolve(); } + private Set<String> extractColumnExpressions(Expression expression) { + Set<String> columnExpressionList = new HashSet<>(); + for (Expression expressions: expression.getChildren()) { + if (expressions != null && expressions.getChildren() != null + && expressions.getChildren().size() > 0) { + columnExpressionList.addAll(extractColumnExpressions(expressions)); + } else if (expressions instanceof ColumnExpression) { + columnExpressionList.add(((ColumnExpression) expressions).getColumnName()); + } + } + return columnExpressionList; + } + + private void checkIfFilterColumnExistsInTable() { + Set<String> columnExpressionList = extractColumnExpressions(expression); + for (String colExpression : columnExpressionList) { + if (colExpression.equalsIgnoreCase("positionid")) { + continue; + } + boolean exists = false; + for (CarbonMeasure carbonMeasure : table.getAllMeasures()) { + if (!carbonMeasure.isInvisible() && carbonMeasure.getColName() + .equalsIgnoreCase(colExpression)) { + exists = true; + } + } + for (CarbonDimension carbonDimension : table.getAllDimensions()) { + if (!carbonDimension.isInvisible() && carbonDimension.getColName() + .equalsIgnoreCase(colExpression)) { + exists = true; + } + } + if (!exists) { + throw new RuntimeException( + "Column " + colExpression + " not found in table " + table.getTableUniqueName()); + } + } + } + public DataMapFilter(FilterResolverIntf resolver) { this.resolver = resolver; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index a6a2031..ce0d6a6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -699,8 +699,13 @@ public final class DataMapStoreManager { SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails(); for (SegmentUpdateDetails updateDetails : updateStatusDetails) { UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName()); - segmentRefreshTime.put(updateVO.getSegmentId(), - new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0)); + SegmentRefreshInfo segmentRefreshInfo; + if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) { + segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0); + } else { + segmentRefreshInfo = new SegmentRefreshInfo(0L, 0); + } + segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo); } } @@ -708,8 +713,11 @@ public final class DataMapStoreManager { SegmentRefreshInfo segmentRefreshInfo = seg.getSegmentRefreshInfo(updateVo); String segmentId = seg.getSegmentNo(); + if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) { + return false; + } if (segmentRefreshTime.get(segmentId) == null - && segmentRefreshInfo.getSegmentUpdatedTimestamp() != null) { + && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) { segmentRefreshTime.put(segmentId, segmentRefreshInfo); return true; } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java index e44092e..9c10a03 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.metadata; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -180,4 +181,12 @@ public final class CarbonMetadata { } return null; } + + public List<CarbonTable> getAllTables() { + return new ArrayList<>(tableInfoMap.values()); + } + + public void clearAll() { + tableInfoMap.clear(); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 1be1624..aa82b64 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; @@ -286,7 +287,8 @@ public class CarbonTable implements Serializable, Writable { * @return */ public static String buildUniqueName(String databaseName, String tableName) { - return databaseName + CarbonCommonConstants.UNDERSCORE + tableName; + return (databaseName + CarbonCommonConstants.UNDERSCORE + tableName).toLowerCase( + Locale.getDefault()); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java index 4425697..61b2987 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java @@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -142,6 +143,9 @@ public class TableSchema implements Serializable, Writable { * @param tableName the tableName to set */ public void setTableName(String tableName) { + if (tableName != null) { + tableName = tableName.toLowerCase(Locale.getDefault()); + } this.tableName = tableName; } diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index eace1b7..f7083dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -775,32 +775,6 @@ public class SegmentUpdateStatusManager { } /** - * Returns the invalid timestamp range of a segment. - * @return - */ - public List<UpdateVO> getInvalidTimestampRange() { - List<UpdateVO> ranges = new ArrayList<UpdateVO>(); - for (LoadMetadataDetails segment : segmentDetails) { - if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() - || SegmentStatus.COMPACTED == segment.getSegmentStatus() - || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { - UpdateVO range = new UpdateVO(); - range.setSegmentId(segment.getLoadName()); - range.setFactTimestamp(segment.getLoadStartTime()); - if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && - !segment.getUpdateDeltaEndTimestamp().isEmpty()) { - range.setUpdateDeltaStartTimestamp( - CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp())); - range.setLatestUpdateTimestamp( - CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp())); - } - ranges.add(range); - } - } - return ranges; - } - - /** * * @param block * @param needCompleteList diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index a86690c..cb23e3e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2239,7 +2239,6 @@ public final class CarbonUtil { org.apache.carbondata.format.TableInfo tableInfo = new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache.carbondata.format.TableSchema>()); - tableInfo.setDataMapSchemas(null); return tableInfo; } diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java index 0ec19f2..6d8dac6 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java @@ -47,6 +47,7 @@ public class CarbonMetadataTest { @BeforeClass public static void setUp() { carbonMetadata = CarbonMetadata.getInstance(); + carbonMetadata.clearAll(); carbonMetadata.loadTableMetadata(getTableInfo(10000)); tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase", "carbonTestTable"); } @@ -77,13 +78,13 @@ public class CarbonMetadataTest { @Test public void testGetCarbonTableReturingProperTableWithProperDimensionCount() { int expectedResult = 1; assertEquals(expectedResult, - carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbonTestTable")); + carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbontesttable")); } @Test public void testGetCarbonTableReturingProperTableWithProperMeasureCount() { int expectedResult = 1; assertEquals(expectedResult, - carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbonTestTable")); + carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbontesttable")); } @Test public void testGetCarbonTableReturingProperTableWithProperDatabaseName() { @@ -92,7 +93,7 @@ public class CarbonMetadataTest { } @Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() { - String expectedResult = "carbonTestTable"; + String expectedResult = "carbontesttable"; assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName()); } diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java index 897c3cf..f03e193 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java @@ -1704,7 +1704,7 @@ public class ThriftWrapperSchemaConverterImplTest { @Test public void testFromExternalToWrapperTableSchema() { String tableId = "1"; - String tableName = "tableName"; + String tableName = "tablename"; TableSchema actualResult = thriftWrapperSchemaConverter.fromExternalToWrapperTableSchema(tabSchema, "tableName"); assertEquals(tableId, actualResult.getTableId()); @@ -1729,7 +1729,7 @@ public class ThriftWrapperSchemaConverterImplTest { TableInfo actualResult = thriftWrapperSchemaConverter .fromExternalToWrapperTableInfo(externalTableInfo, "dbName", "tableName", "/path"); assertEquals(time, actualResult.getLastUpdatedTime()); - assertEquals("dbName_tableName", actualResult.getTableUniqueName()); + assertEquals("dbname_tablename", actualResult.getTableUniqueName()); } } \ No newline at end of file diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java index ec1303f..0f9e252 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java @@ -44,11 +44,11 @@ public class CarbonTableTest extends TestCase { } @Test public void testNumberOfDimensionReturnsProperCount() { - assertEquals(1, carbonTable.getNumberOfDimensions("carbonTestTable")); + assertEquals(1, carbonTable.getNumberOfDimensions("carbontesttable")); } @Test public void testNumberOfMeasureReturnsProperCount() { - assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable")); + assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable")); } @Test public void testGetDatabaseNameResturnsDatabaseName() { @@ -56,7 +56,7 @@ public class CarbonTableTest extends TestCase { } @Test public void testFactTableNameReturnsProperFactTableName() { - assertEquals("carbonTestTable", carbonTable.getTableName()); + assertEquals("carbontesttable", carbonTable.getTableName()); } @Test public void testTableUniqueNameIsProper() { @@ -65,7 +65,7 @@ public class CarbonTableTest extends TestCase { @Test public void testDimensionPresentInTableIsProper() { CarbonDimension dimension = new CarbonDimension(getColumnarDimensionColumn(), 0, -1, -1); - assertTrue(carbonTable.getDimensionByName("carbonTestTable", "IMEI").equals(dimension)); + assertTrue(carbonTable.getDimensionByName("carbontesttable", "IMEI").equals(dimension)); } static ColumnSchema getColumnarDimensionColumn() { diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java index d3403d5..0d0d4df 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java @@ -43,11 +43,11 @@ public class CarbonTableWithComplexTypesTest extends TestCase { } @Test public void testNumberOfDimensionReturnsProperCount() { - assertEquals(2, carbonTable.getNumberOfDimensions("carbonTestTable")); + assertEquals(2, carbonTable.getNumberOfDimensions("carbontesttable")); } @Test public void testNumberOfMeasureReturnsProperCount() { - assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable")); + assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable")); } @Test public void testGetDatabaseNameResturnsDatabaseName() { @@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase { } @Test public void testFactTableNameReturnsProperFactTableName() { - assertEquals("carbonTestTable", carbonTable.getTableName()); + assertEquals("carbontesttable", carbonTable.getTableName()); } @Test public void testTableUniqueNameIsProper() { diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala index 37ad08c..236dcfc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala @@ -261,31 +261,6 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterEach { sql("drop table carbontable") } - test("test mdt file path with configured paths") { - sql(s"create database carbon location '$dblocation'") - sql("use carbon") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, "/tmp/carbondata1/carbondata2/") - val (timestampFile, timestampFileType) = getMdtFileAndType() - FileFactory.deleteFile(timestampFile, timestampFileType) - sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql("drop table carbontable") - // perform file check - assert(FileFactory.isFileExist(timestampFile, true) || - CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, - CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT) - val (timestampFile2, timestampFileType2) = getMdtFileAndType() - FileFactory.deleteFile(timestampFile2, timestampFileType2) - sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql("drop table carbontable") - // perform file check - assert(FileFactory.isFileExist(timestampFile, true) || - CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) - } - override def afterEach { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala index febba35..e3f1d3f 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala @@ -71,7 +71,7 @@ object CarbonSessionUtil { case _ => } isRelationRefreshed = - CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) + CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession) case _ => } } @@ -79,12 +79,12 @@ object CarbonSessionUtil { rtnRelation match { case SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) => - isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession) if (catalogTable.isInstanceOf[Option[CatalogTable]]) { catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone) } case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable) => - isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession) if (catalogTable.isInstanceOf[Option[CatalogTable]]) { catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone) } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index 718bb74..fdaa3d1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -106,6 +106,11 @@ object IndexServer extends ServerInterface { sparkSession.sparkContext .setLocalProperty("spark.job.description", request.getTaskGroupDesc) } + if (!request.getInvalidSegments.isEmpty) { + DistributedRDDUtils + .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName, + request.getInvalidSegments.asScala) + } val splits = new DistributedPruneRDD(sparkSession, request).collect() if (!request.isFallbackJob) { DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) @@ -113,11 +118,6 @@ object IndexServer extends ServerInterface { if (request.isJobToClearDataMaps) { DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName) } - if (!request.getInvalidSegments.isEmpty) { - DistributedRDDUtils - .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName, - request.getInvalidSegments.asScala) - } new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index e7a6d65..c13e7b9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util._ import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF} @@ -215,38 +215,35 @@ object CarbonEnv { databaseNameOp: Option[String], tableName: String) (sparkSession: SparkSession): CarbonTable = { - refreshRelationFromCache(TableIdentifier(tableName, databaseNameOp))(sparkSession) - val databaseName = getDatabaseName(databaseNameOp)(sparkSession) val catalog = getInstance(sparkSession).carbonMetaStore - // refresh cache - catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, databaseNameOp)) - - // try to get it from catch, otherwise lookup in catalog - catalog.getTableFromMetadataCache(databaseName, tableName) - .getOrElse( - catalog - .lookupRelation(databaseNameOp, tableName)(sparkSession) - .asInstanceOf[CarbonRelation] - .carbonTable) + // if relation is not refreshed of the table does not exist in cache then + if (isRefreshRequired(TableIdentifier(tableName, databaseNameOp))(sparkSession)) { + catalog + .lookupRelation(databaseNameOp, tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + } else { + CarbonMetadata.getInstance().getCarbonTable(databaseNameOp.getOrElse(sparkSession + .catalog.currentDatabase), tableName) + } } - def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - var isRefreshed = false + /** + * + * @return true is the relation was changes and was removed from cache. false is there is no + * change in the relation. + */ + def isRefreshRequired(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { val carbonEnv = getInstance(sparkSession) - val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache( - identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), - identifier.table) - if (carbonEnv.carbonMetaStore - .checkSchemasModifiedTimeAndReloadTable(identifier) && table.isDefined) { - sparkSession.sessionState.catalog.refreshTable(identifier) - val tablePath = table.get.getTablePath - DataMapStoreManager.getInstance(). - clearDataMaps(AbsoluteTableIdentifier.from(tablePath, + val databaseName = identifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val table = CarbonMetadata.getInstance().getCarbonTable(databaseName, identifier.table) + if (table == null) { + true + } else { + carbonEnv.carbonMetaStore.isSchemaRefreshed(AbsoluteTableIdentifier.from(table.getTablePath, identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), - identifier.table, table.get.getTableInfo.getFactTable.getTableId)) - isRefreshed = true + identifier.table, table.getTableInfo.getFactTable.getTableId), sparkSession) } - isRefreshed } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index b4e60fb..1fa1337 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -67,7 +67,6 @@ case class CarbonDropDataMapCommand( val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetaStore val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) - catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) if (mainTable == null) { mainTable = try { CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 6a9ac0a..cebf606 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -105,8 +105,6 @@ case class RefreshCarbonTableCommand( } } } - // update the schema modified time - metaStore.updateAndTouchSchemasUpdatedTime() Seq.empty } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 566e44e..507fe02 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -60,14 +60,8 @@ case class CarbonAlterTableDropPartitionCommand( setAuditTable(dbName, tableName) setAuditInfo(Map("partition" -> model.partitionId)) val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore - val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] - val tablePath = relation.carbonTable.getTablePath - carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) == null) { - throwMetadataException(dbName, tableName, "table not found") - } - val carbonTable = relation.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sparkSession) + val tablePath = carbonTable.getTablePath val partitionInfo = carbonTable.getPartitionInfo(tableName) if (partitionInfo == null) { throwMetadataException(dbName, tableName, "table is not a partition table") @@ -116,8 +110,6 @@ case class CarbonAlterTableDropPartitionCommand( thriftTable, null, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) - // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime() // sparkSession.catalog.refreshTable(tableName) Seq.empty } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 72c3142..36ddce4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -69,7 +69,6 @@ case class CarbonAlterTableSplitPartitionCommand( if (relation == null) { throwMetadataException(dbName, tableName, "table not found") } - carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) if (null == (CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession))) { LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") throwMetadataException(dbName, tableName, "table not found") @@ -107,8 +106,6 @@ case class CarbonAlterTableSplitPartitionCommand( thriftTable, null, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) - // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime() Seq.empty } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index db52361..480d1f7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -471,7 +471,7 @@ object PreAggregateUtil { val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getTableName CarbonEnv.getInstance(sparkSession).carbonMetaStore - .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier, + .updateTableSchema(carbonTable.getCarbonTableIdentifier, carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) @@ -539,23 +539,6 @@ object PreAggregateUtil { } } - def getChildCarbonTable(databaseName: String, tableName: String) - (sparkSession: SparkSession): Option[CarbonTable] = { - val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore - val carbonTable = metaStore.getTableFromMetadataCache(databaseName, tableName) - if (carbonTable.isEmpty) { - try { - Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation].metaData.carbonTable) - } catch { - case _: Exception => - None - } - } else { - carbonTable - } - } - /** * Below method will be used to update logical plan * this is required for creating pre aggregate tables, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index ff0177b..6b80bbe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand +import org.apache.spark.sql.hive.CarbonFileMetastore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree @@ -183,6 +184,18 @@ case class CarbonDropTableCommand( ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext) + // Remove all invalid entries of carbonTable and corresponding updated timestamp + // values from the cache. This case is valid when there are 2 JDBCServer and one of them + // drops the table, the other server would not be able to clear its cache. + try { + CarbonEnv.getInstance(sparkSession).carbonMetaStore match { + case metastore: CarbonFileMetastore => metastore.removeStaleTimeStampEntries(sparkSession) + case _ => + } + } catch { + case _: Exception => + // Do nothing + } } catch { case ex: NoSuchTableException => if (!ifExistsSet) { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index ea3bba8..7ab2d47 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.hive import java.io.IOException import java.net.URI +import java.util.Locale +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock} +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} import org.apache.spark.sql.catalyst.TableIdentifier @@ -36,7 +38,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory @@ -87,59 +88,95 @@ object MatchLogicalRelation { } } -class CarbonFileMetastore extends CarbonMetaStore { +private object CarbonFileMetastore { + + final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]() + + def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier, + localTimeStamp: Long): Boolean = synchronized { + val schemaFilePath = CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath) + val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath) + if (schemaCarbonFile.exists()) { + val oldTime = Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableId)) + val newTime = schemaCarbonFile.getLastModifiedTime + val isSchemaModified = oldTime match { + case Some(cacheTime) => + cacheTime != newTime + case None => true + } + if (isSchemaModified) { + CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier + .getCarbonTableIdentifier.getTableUniqueName) + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) + true + } else { + localTimeStamp != newTime + } + } else { + false + } + } - @transient - val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") + def updateTableSchemaModifiedTime(tableUniqueId: String, timeStamp: Long): Unit = { + tableModifiedTimeStore.put(tableUniqueId, timeStamp) + } - val tableModifiedTimeStore = new java.util.HashMap[String, Long]() - tableModifiedTimeStore - .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis()) + def getTableModifiedTime(tableUniqueId: String): Long = { + tableModifiedTimeStore.get(tableUniqueId) + } - def nextQueryId: String = { - System.nanoTime() + "" + def removeStaleEntries(invalidTableUniqueIds: List[String]) { + for (invalidKey <- invalidTableUniqueIds) { + tableModifiedTimeStore.remove(invalidKey) + } } +} - val metadata = MetaData(new ArrayBuffer[CarbonTable]()) +class CarbonFileMetastore extends CarbonMetaStore { + @transient private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]() /** - * Create spark session from paramters. - * - * @param parameters - * @param absIdentifier - * @param sparkSession + * Create Carbon Relation by reading the schema file */ override def createCarbonRelation(parameters: Map[String, String], absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absIdentifier.getCarbonTableIdentifier.getTableName - val tables = getTableFromMetadataCache(database, tableName) + val tables = Option(CarbonMetadata.getInstance.getCarbonTable(database, tableName)) tables match { case Some(t) => - CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t) - case None => - readCarbonSchema(absIdentifier, - !parameters.getOrElse("isTransactional", "true").toBoolean) match { - case Some(meta) => - CarbonRelation(database, tableName, - CarbonSparkUtil.createSparkMeta(meta), meta) - case None => - throw new NoSuchTableException(database, tableName) + if (isSchemaRefreshed(absIdentifier, sparkSession)) { + readCarbonSchema(absIdentifier, parameters) + } else { + CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t) } + case None => + readCarbonSchema(absIdentifier, parameters) + } + } + + private def readCarbonSchema(absIdentifier: AbsoluteTableIdentifier, + parameters: Map[String, String]): CarbonRelation = { + readCarbonSchema(absIdentifier, + !parameters.getOrElse("isTransactional", "true").toBoolean) match { + case Some(meta) => + CarbonRelation(absIdentifier.getDatabaseName, absIdentifier.getTableName, + CarbonSparkUtil.createSparkMeta(meta), meta) + case None => + throw new NoSuchTableException(absIdentifier.getDatabaseName, absIdentifier.getTableName) } } /** - * This method will overwrite the existing schema and update it with the given details - * - * @param newTableIdentifier - * @param thriftTableInfo - * @param carbonStorePath - * @param sparkSession + * This method will overwrite the existing schema and update it with the given details. */ - def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonStorePath: String)(sparkSession: SparkSession): String = { @@ -174,13 +211,17 @@ class CarbonFileMetastore extends CarbonMetaStore { catalogTable.provider match { case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource") || name.equalsIgnoreCase("carbondata")) => name - case _ => throw new NoSuchTableException(database, tableIdentifier.table) + case _ => + CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table) + throw new NoSuchTableException(database, tableIdentifier.table) } val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( catalogTable.location.toString, database, tableIdentifier.table) CarbonEnv.getInstance(sparkSession).carbonMetaStore. createCarbonRelation(catalogTable.storage.properties, identifier, sparkSession) - case _ => throw new NoSuchTableException(database, tableIdentifier.table) + case _ => + CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table) + throw new NoSuchTableException(database, tableIdentifier.table) } // fire post event after lookup relation @@ -193,22 +234,6 @@ class CarbonFileMetastore extends CarbonMetaStore { relation } - /** - * This method will search for a table in the catalog metadata - * - * @param database - * @param tableName - * @return - */ - def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = { - metadata.readLock.lock() - val ret = metadata.carbonTables - .find(table => table.getDatabaseName.equalsIgnoreCase(database) && - table.getTableName.equalsIgnoreCase(tableName)) - metadata.readLock.unlock() - ret - } - def tableExists( table: String, databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = { @@ -226,71 +251,60 @@ class CarbonFileMetastore extends CarbonMetaStore { true } - def isTableInMetastore(identifier: AbsoluteTableIdentifier, - sparkSession: SparkSession): Boolean = { - sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName) - .exists(_.table.equalsIgnoreCase(identifier.getTableName)) - } - - private def readCarbonSchema(identifier: AbsoluteTableIdentifier, inferSchema: Boolean): Option[CarbonTable] = { - val schemaConverter = new ThriftWrapperSchemaConverterImpl val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val tableName = identifier.getCarbonTableIdentifier.getTableName val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) val tablePath = identifier.getTablePath + var schemaRefreshTime = System.currentTimeMillis() val wrapperTableInfo = - if (inferSchema) { - val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName) - val tblInfoFromCache = if (carbonTbl != null) { - carbonTbl.getTableInfo - } else { - null - } - - val thriftTableInfo : TableInfo = if (tblInfoFromCache != null) { - // In case the TableInfo is present in the Carbon Metadata Cache - // then get the tableinfo from the cache rather than infering from - // the CarbonData file. - schemaConverter - .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName) - } else { - schemaConverter - .fromWrapperToExternalTableInfo(SchemaReader - .inferSchema(identifier, false), - dbName, tableName) - } + if (inferSchema) { + val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName) + val tblInfoFromCache = if (carbonTbl != null) { + carbonTbl.getTableInfo + } else { + null + } - val wrapperTableInfo = - schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) - wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") - wrapperTableInfo.setTransactionalTable(false) - Some(wrapperTableInfo) - } else { - val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) - val fileType = FileFactory.getFileType(tableMetadataFile) - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val thriftTableInfo: TableInfo = if (tblInfoFromCache != null) { + // In case the TableInfo is present in the Carbon Metadata Cache + // then get the tableinfo from the cache rather than infering from + // the CarbonData file. + schemaConverter + .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName) + } else { + schemaConverter + .fromWrapperToExternalTableInfo(SchemaReader + .inferSchema(identifier, false), + dbName, tableName) + } val wrapperTableInfo = - schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) + wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") + wrapperTableInfo.setTransactionalTable(false) Some(wrapperTableInfo) } else { - None + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) + schemaRefreshTime = FileFactory + .getCarbonFile(CarbonTablePath.getSchemaFilePath(tablePath)).getLastModifiedTime + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val wrapperTableInfo = + schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + Some(wrapperTableInfo) + } else { + None + } } - } - - wrapperTableInfo.map { tableInfo => + updateSchemasUpdatedTime(tableInfo.getFactTable.getTableId, schemaRefreshTime) CarbonMetadata.getInstance().removeTable(tableUniqueName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metadata.writeLock.lock() - metadata.carbonTables += carbonTable - metadata.writeLock.unlock() - carbonTable + CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) } } @@ -319,17 +333,12 @@ class CarbonFileMetastore extends CarbonMetaStore { newTableIdentifier.getTableName, oldTableIdentifier.getTableId) val path = createSchemaThriftFile(newAbsoluteTableIdentifier, thriftTableInfo) - addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier) - + addCarbonTableToCache(wrapperTableInfo, newAbsoluteTableIdentifier) path } /** * This method will is used to remove the evolution entry in case of failure. - * - * @param carbonTableIdentifier - * @param thriftTableInfo - * @param sparkSession */ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, @@ -343,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo) - addTableCache(wrapperTableInfo, absoluteTableIdentifier) + addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier) path } @@ -360,7 +369,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val childSchemaList = wrapperTableInfo.getDataMapSchemaList childSchemaList.remove(childSchemaList.size() - 1) val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo) - addTableCache(wrapperTableInfo, absoluteTableIdentifier) + addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier) path } @@ -393,7 +402,7 @@ class CarbonFileMetastore extends CarbonMetaStore { tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - addTableCache(tableInfo, absoluteTableIdentifier) + addCarbonTableToCache(tableInfo, absoluteTableIdentifier) CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } @@ -417,61 +426,27 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - updateSchemasUpdatedTime(touchSchemaFileSystemTime()) + val modifiedTime = System.currentTimeMillis() + FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime) + updateSchemasUpdatedTime(identifier.getCarbonTableIdentifier.getTableId, modifiedTime) identifier.getTablePath } - protected def addTableCache( + protected def addCarbonTableToCache( tableInfo: table.TableInfo, - absoluteTableIdentifier: AbsoluteTableIdentifier): ArrayBuffer[CarbonTable] = { + absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = { val identifier = absoluteTableIdentifier.getCarbonTableIdentifier - CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - metadata.writeLock.lock() - metadata.carbonTables += - CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName) - metadata.writeLock.unlock() - metadata.carbonTables } /** - * This method will remove the table meta from catalog metadata array - * - * @param dbName - * @param tableName + * This method will remove the table meta from CarbonMetadata cache. */ def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName) - carbonTableToBeRemoved match { - case Some(carbonTable) => - metadata.writeLock.lock() - metadata.carbonTables -= carbonTable - metadata.writeLock.unlock() - case None => - if (LOGGER.isDebugEnabled) { - LOGGER.debug(s"No entry for table $tableName in database $dbName") - } - } CarbonMetadata.getInstance.removeTable(dbName, tableName) } - private def updateMetadataByWrapperTable( - wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = { - - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - wrapperTableInfo.getTableUniqueName) - for (i <- metadata.carbonTables.indices) { - metadata.writeLock.lock() - if (wrapperTableInfo.getTableUniqueName.equals( - metadata.carbonTables(i).getTableUniqueName)) { - metadata.carbonTables(i) = carbonTable - } - metadata.writeLock.unlock() - } - } - def updateMetadataByThriftTable(schemaFilePath: String, tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = { tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) @@ -479,7 +454,8 @@ class CarbonFileMetastore extends CarbonMetaStore { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) - updateMetadataByWrapperTable(wrapperTableInfo) + addCarbonTableToCache(wrapperTableInfo, + wrapperTableInfo.getOrCreateAbsoluteTableIdentifier()) } @@ -496,133 +472,65 @@ class CarbonFileMetastore extends CarbonMetaStore { } - def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier) - (sparkSession: SparkSession) { + def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession) { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath) val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } - val fileType = FileFactory.getFileType(metadataFilePath) - - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - // while drop we should refresh the schema modified time so that if any thing has changed - // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - - CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) - updateSchemasUpdatedTime(touchSchemaFileSystemTime()) - // discard cached table info in cachedDataSourceTables - val tableIdentifier = TableIdentifier(tableName, Option(dbName)) - sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) - SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) - removeTableFromMetadata(dbName, tableName) - } else { - if (!isTransactionalCarbonTable(absoluteTableIdentifier)) { - removeTableFromMetadata(dbName, tableName) - CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) - // discard cached table info in cachedDataSourceTables - val tableIdentifier = TableIdentifier(tableName, Option(dbName)) - sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) - SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) - removeTableFromMetadata(dbName, tableName) - } - } + CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) + // discard cached table info in cachedDataSourceTables + val tableIdentifier = TableIdentifier(tableName, Option(dbName)) + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) + SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) + removeTableFromMetadata(dbName, tableName) } def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = { - val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName) - table.map(_.getTableInfo.isTransactionalTable).getOrElse(true) - } - - private def getTimestampFileAndType() = { - var basePath = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, - CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT) - basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath) - val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE - val timestampFileType = FileFactory.getFileType(timestampFile) - if (!FileFactory.isFileExist(basePath, timestampFileType)) { - FileFactory - .createDirectoryAndSetPermission(basePath, - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + val table = Option(CarbonMetadata.getInstance() + .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)) + table match { + case Some(t) => t.isTransactionalTable + case None => true } - (timestampFile, timestampFileType) } /** * This method will put the updated timestamp of schema file in the table modified time store map - * - * @param timeStamp */ - private def updateSchemasUpdatedTime(timeStamp: Long) { - tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, timeStamp) - } - - def updateAndTouchSchemasUpdatedTime() { - updateSchemasUpdatedTime(touchSchemaFileSystemTime()) + private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long) { + tableModifiedTimeStore.put(tableUniqueId, timeStamp) + CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp) } - /** - * This method will check and create an empty schema timestamp file - * - * @return - */ - private def touchSchemaFileSystemTime(): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType() - if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.info(s"Creating timestamp file for $timestampFile") - FileFactory - .createNewFile(timestampFile, - timestampFileType, - true, - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) - } - FileFactory.getCarbonFile(timestampFile, timestampFileType) - .setLastModifiedTime(System.currentTimeMillis()) - // since there is no guarantee that exact same set modified time returns when called - // lastmodified time, so better get the time from file. - FileFactory.getCarbonFile(timestampFile, timestampFileType) - .getLastModifiedTime - } - - def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = { - val (timestampFile, timestampFileType) = getTimestampFileAndType() - var isRefreshed = false - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - val lastModifiedTime = - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - if (!(lastModifiedTime == - tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) { - metadata.writeLock.lock() - metadata.carbonTables = metadata.carbonTables.filterNot( - table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) && - table.getDatabaseName - .equalsIgnoreCase(tableIdentifier.database - .getOrElse(SparkSession.getActiveSession.get.sessionState.catalog - .getCurrentDatabase))) - metadata.writeLock.unlock() - updateSchemasUpdatedTime(lastModifiedTime) - isRefreshed = true + override def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean = { + val localTimeStamp = Option(tableModifiedTimeStore.get(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableId)) + if (localTimeStamp.isDefined) { + if (CarbonFileMetastore.checkIfRefreshIsNeeded(absoluteTableIdentifier, localTimeStamp.get)) { + sparkSession.sessionState + .catalog.refreshTable(TableIdentifier(absoluteTableIdentifier.getTableName, + Some(absoluteTableIdentifier.getDatabaseName))) + true + } else { + false } + } else { + true } - isRefreshed } override def isReadFromHiveMetaStore: Boolean = false override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = { - metadata.readLock.lock - val ret = metadata.carbonTables.clone() - metadata.readLock.unlock - ret + CarbonMetadata.getInstance().getAllTables.asScala } @@ -672,4 +580,21 @@ class CarbonFileMetastore extends CarbonMetaStore { case _ => throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) } } + + def removeStaleTimeStampEntries(sparkSession: SparkSession): Unit = { + val tablesList = sparkSession.sessionState.catalog.listDatabases().flatMap { + database => + sparkSession.sessionState.catalog.listTables(database) + .map(table => s"${ database }_${ table.table }".toLowerCase(Locale.getDefault())) + } + val invalidTableIds = CarbonMetadata.getInstance().getAllTables.asScala.collect { + case carbonTable if !tablesList.contains(carbonTable.getTableUniqueName + .toLowerCase(Locale.getDefault())) => + CarbonMetadata.getInstance() + .removeTable(carbonTable.getTableUniqueName.toLowerCase(Locale.getDefault())) + carbonTable.getTableId + } + CarbonFileMetastore.removeStaleEntries(invalidTableIds.toList) + } + } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index c8c7d31..a0a8c0f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -63,6 +63,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { carbonRelation } + override def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean = true override def isTablePathExists(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): Boolean = { @@ -78,7 +80,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables val tableIdentifier = TableIdentifier(tableName, Option(dbName)) @@ -88,11 +89,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { removeTableFromMetadata(dbName, tableName) } - override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = { - // do nothing - false - } - override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = { // Todo Seq() @@ -140,7 +136,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { * @param carbonTablePath * @param sparkSession */ - override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonTablePath: String)(sparkSession: SparkSession): String = { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index f97a8ae..95efa6a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -82,7 +82,7 @@ trait CarbonMetaStore { * @param carbonStorePath * @param sparkSession */ - def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonStorePath: String)(sparkSession: SparkSession): String @@ -135,9 +135,8 @@ trait CarbonMetaStore { def dropTable(tableIdentifier: AbsoluteTableIdentifier) (sparkSession: SparkSession) - def updateAndTouchSchemasUpdatedTime() - - def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean + def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean def isReadFromHiveMetaStore: Boolean @@ -147,8 +146,6 @@ trait CarbonMetaStore { carbonTable: CarbonTable ): org.apache.carbondata.format.TableInfo - def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] - /** * Method will be used to retrieve or create carbon data source relation * @@ -173,6 +170,7 @@ trait CarbonMetaStore { val df: DataFrame = Dataset.ofRows(sparkSession, query) df.schema } + } /** * Factory for Carbon metastore diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala index cb3ae29..a9cb652 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -69,9 +69,6 @@ object CleanFiles { forceTableClean = args(2).toBoolean } val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName") - CarbonEnv.getInstance(spark).carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - cleanFiles(spark, dbName, tableName, forceTableClean) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala index 0a3a870..91203a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala @@ -58,8 +58,6 @@ object Compaction { val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1))) val compactionType = TableAPIUtil.escape(args(2)) val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName") - CarbonEnv.getInstance(spark).carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) compaction(spark, dbName, tableName, compactionType) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala index 90a37f6..6149421 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala @@ -45,8 +45,6 @@ object DeleteSegmentByDate { val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1))) val dateValue = TableAPIUtil.escape(args(2)) val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName") - CarbonEnv.getInstance(spark).carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) deleteSegmentByDate(spark, dbName, tableName, dateValue) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala index 15bec02..023c7bf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala @@ -50,8 +50,6 @@ object DeleteSegmentById { val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1))) val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2))) val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName") - CarbonEnv.getInstance(spark).carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) deleteSegmentById(spark, dbName, tableName, segmentIds) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala index efaa191..ef9a931 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala @@ -82,8 +82,6 @@ object TableLoader { val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName") - CarbonEnv.getInstance(spark).carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) loadTable(spark, Option(dbName), tableName, inputPaths, map) } diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 26f778e..75e7d89 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -125,9 +125,9 @@ class CarbonHiveSessionCatalog( rtnRelation match { case SubqueryAlias(_, LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) => - toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) - case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) + toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession) + case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession) case _ => } @@ -138,31 +138,6 @@ class CarbonHiveSessionCatalog( } } - private def refreshRelationFromCache(identifier: TableIdentifier, - alias: Option[String], - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { - var isRefreshed = false - val storePath = CarbonProperties.getStorePath - carbonEnv.carbonMetaStore. - checkSchemasModifiedTimeAndReloadTable(identifier) - - val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache( - carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, - carbonDatasourceHadoopRelation.carbonTable.getTableName) - if (table.isEmpty || (table.isDefined && - table.get.getTableLastUpdatedTime != - carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { - refreshTable(identifier) - DataMapStoreManager.getInstance(). - clearDataMaps(AbsoluteTableIdentifier.from(storePath, - identifier.database.getOrElse("default"), - identifier.table)) - isRefreshed = true - logInfo(s"Schema changes have been detected for table: $identifier") - } - isRefreshed - } - /** * returns hive client from session state * diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala index dd1fa0f..d368a8e 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala @@ -133,10 +133,10 @@ class AlterTableColumnRenameTestCase extends Spark2QueryTest with BeforeAndAfter sql("alter table rename change empname name string") sql("update rename set (name) = ('joey') where workgroupcategory = 'developer'").show() sql("insert into rename select 20,'bill','PM','01-12-2015',3,'manager',14,'Learning',928479,'01-01-2016','30-11-2016',75,94,13547") - val df1 = sql("select * from rename where name = 'joey'") + val df1Count = sql("select * from rename where name = 'joey'").count sql("alter table rename change name empname string") val df2 = sql("select * from rename where empname = 'joey'") - assert(df1.count() == df2.count()) + assert(df1Count == df2.count()) sql("delete from rename where empname = 'joey'") val df3 = sql("select empname from rename") sql("alter table rename change empname newname string")