This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a5344df [CARBONDATA-3480] Fixed unnecessary refresh for table by
removing modified mdt file
a5344df is described below
commit a5344df2bfe20560324f9a0b1ef92051540e70d8
Author: kunal642 <[email protected]>
AuthorDate: Fri Jul 26 14:52:36 2019 +0530
[CARBONDATA-3480] Fixed unnecessary refresh for table by removing modified
mdt file
This closes #3339
---
.../carbondata/core/datamap/DataMapFilter.java | 47 +++
.../core/datamap/DataMapStoreManager.java | 14 +-
.../carbondata/core/metadata/CarbonMetadata.java | 9 +
.../core/metadata/schema/table/CarbonTable.java | 4 +-
.../core/metadata/schema/table/TableSchema.java | 4 +
.../statusmanager/SegmentUpdateStatusManager.java | 26 --
.../apache/carbondata/core/util/CarbonUtil.java | 1 -
.../core/metadata/CarbonMetadataTest.java | 7 +-
.../ThriftWrapperSchemaConverterImplTest.java | 4 +-
.../metadata/schema/table/CarbonTableTest.java | 8 +-
.../table/CarbonTableWithComplexTypesTest.java | 6 +-
.../dblocation/DBLocationCarbonTableTestCase.scala | 25 --
.../apache/spark/sql/hive/CarbonSessionUtil.scala | 6 +-
.../carbondata/indexserver/IndexServer.scala | 10 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 51 ++-
.../command/datamap/CarbonDropDataMapCommand.scala | 1 -
.../management/RefreshCarbonTableCommand.scala | 2 -
.../CarbonAlterTableDropPartitionCommand.scala | 12 +-
.../CarbonAlterTableSplitPartitionCommand.scala | 3 -
.../command/preaaggregate/PreAggregateUtil.scala | 19 +-
.../command/table/CarbonDropTableCommand.scala | 13 +
.../spark/sql/hive/CarbonFileMetastore.scala | 425 +++++++++------------
.../spark/sql/hive/CarbonHiveMetaStore.scala | 10 +-
.../apache/spark/sql/hive/CarbonMetaStore.scala | 10 +-
.../scala/org/apache/spark/util/CleanFiles.scala | 3 -
.../scala/org/apache/spark/util/Compaction.scala | 2 -
.../apache/spark/util/DeleteSegmentByDate.scala | 2 -
.../org/apache/spark/util/DeleteSegmentById.scala | 2 -
.../scala/org/apache/spark/util/TableLoader.scala | 2 -
.../apache/spark/sql/hive/CarbonSessionState.scala | 31 +-
.../AlterTableColumnRenameTestCase.scala | 4 +-
31 files changed, 322 insertions(+), 441 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index c20d0d5..ac4886d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -18,10 +18,15 @@
package org.apache.carbondata.core.datamap;
import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -39,9 +44,51 @@ public class DataMapFilter implements Serializable {
public DataMapFilter(CarbonTable table, Expression expression) {
this.table = table;
this.expression = expression;
+ if (expression != null) {
+ checkIfFilterColumnExistsInTable();
+ }
resolve();
}
+ private Set<String> extractColumnExpressions(Expression expression) {
+ Set<String> columnExpressionList = new HashSet<>();
+ for (Expression expressions: expression.getChildren()) {
+ if (expressions != null && expressions.getChildren() != null
+ && expressions.getChildren().size() > 0) {
+ columnExpressionList.addAll(extractColumnExpressions(expressions));
+ } else if (expressions instanceof ColumnExpression) {
+ columnExpressionList.add(((ColumnExpression)
expressions).getColumnName());
+ }
+ }
+ return columnExpressionList;
+ }
+
+ private void checkIfFilterColumnExistsInTable() {
+ Set<String> columnExpressionList = extractColumnExpressions(expression);
+ for (String colExpression : columnExpressionList) {
+ if (colExpression.equalsIgnoreCase("positionid")) {
+ continue;
+ }
+ boolean exists = false;
+ for (CarbonMeasure carbonMeasure : table.getAllMeasures()) {
+ if (!carbonMeasure.isInvisible() && carbonMeasure.getColName()
+ .equalsIgnoreCase(colExpression)) {
+ exists = true;
+ }
+ }
+ for (CarbonDimension carbonDimension : table.getAllDimensions()) {
+ if (!carbonDimension.isInvisible() && carbonDimension.getColName()
+ .equalsIgnoreCase(colExpression)) {
+ exists = true;
+ }
+ }
+ if (!exists) {
+ throw new RuntimeException(
+ "Column " + colExpression + " not found in table " +
table.getTableUniqueName());
+ }
+ }
+ }
+
public DataMapFilter(FilterResolverIntf resolver) {
this.resolver = resolver;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a6a2031..ce0d6a6 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -699,8 +699,13 @@ public final class DataMapStoreManager {
SegmentUpdateDetails[] updateStatusDetails =
statusManager.getUpdateStatusDetails();
for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
UpdateVO updateVO =
statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
- segmentRefreshTime.put(updateVO.getSegmentId(),
- new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(),
0));
+ SegmentRefreshInfo segmentRefreshInfo;
+ if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
+ segmentRefreshInfo = new
SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
+ } else {
+ segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+ }
+ segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
}
}
@@ -708,8 +713,11 @@ public final class DataMapStoreManager {
SegmentRefreshInfo segmentRefreshInfo =
seg.getSegmentRefreshInfo(updateVo);
String segmentId = seg.getSegmentNo();
+ if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) {
+ return false;
+ }
if (segmentRefreshTime.get(segmentId) == null
- && segmentRefreshInfo.getSegmentUpdatedTimestamp() != null) {
+ && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
segmentRefreshTime.put(segmentId, segmentRefreshInfo);
return true;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index e44092e..9c10a03 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -180,4 +181,12 @@ public final class CarbonMetadata {
}
return null;
}
+
+ public List<CarbonTable> getAllTables() {
+ return new ArrayList<>(tableInfoMap.values());
+ }
+
+ public void clearAll() {
+ tableInfoMap.clear();
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1be1624..aa82b64 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -286,7 +287,8 @@ public class CarbonTable implements Serializable, Writable {
* @return
*/
public static String buildUniqueName(String databaseName, String tableName) {
- return databaseName + CarbonCommonConstants.UNDERSCORE + tableName;
+ return (databaseName + CarbonCommonConstants.UNDERSCORE +
tableName).toLowerCase(
+ Locale.getDefault());
}
/**
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 4425697..61b2987 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -142,6 +143,9 @@ public class TableSchema implements Serializable, Writable {
* @param tableName the tableName to set
*/
public void setTableName(String tableName) {
+ if (tableName != null) {
+ tableName = tableName.toLowerCase(Locale.getDefault());
+ }
this.tableName = tableName;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index eace1b7..f7083dc 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -775,32 +775,6 @@ public class SegmentUpdateStatusManager {
}
/**
- * Returns the invalid timestamp range of a segment.
- * @return
- */
- public List<UpdateVO> getInvalidTimestampRange() {
- List<UpdateVO> ranges = new ArrayList<UpdateVO>();
- for (LoadMetadataDetails segment : segmentDetails) {
- if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
- || SegmentStatus.COMPACTED == segment.getSegmentStatus()
- || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
- UpdateVO range = new UpdateVO();
- range.setSegmentId(segment.getLoadName());
- range.setFactTimestamp(segment.getLoadStartTime());
- if (!segment.getUpdateDeltaStartTimestamp().isEmpty() &&
- !segment.getUpdateDeltaEndTimestamp().isEmpty()) {
- range.setUpdateDeltaStartTimestamp(
-
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
- range.setLatestUpdateTimestamp(
-
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
- }
- ranges.add(range);
- }
- }
- return ranges;
- }
-
- /**
*
* @param block
* @param needCompleteList
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a86690c..cb23e3e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2239,7 +2239,6 @@ public final class CarbonUtil {
org.apache.carbondata.format.TableInfo tableInfo =
new org.apache.carbondata.format.TableInfo(thriftFactTable,
new ArrayList<org.apache.carbondata.format.TableSchema>());
-
tableInfo.setDataMapSchemas(null);
return tableInfo;
}
diff --git
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0ec19f2..6d8dac6 100644
---
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -47,6 +47,7 @@ public class CarbonMetadataTest {
@BeforeClass public static void setUp() {
carbonMetadata = CarbonMetadata.getInstance();
+ carbonMetadata.clearAll();
carbonMetadata.loadTableMetadata(getTableInfo(10000));
tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase",
"carbonTestTable");
}
@@ -77,13 +78,13 @@ public class CarbonMetadataTest {
@Test public void
testGetCarbonTableReturingProperTableWithProperDimensionCount() {
int expectedResult = 1;
assertEquals(expectedResult,
-
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbonTestTable"));
+
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbontesttable"));
}
@Test public void
testGetCarbonTableReturingProperTableWithProperMeasureCount() {
int expectedResult = 1;
assertEquals(expectedResult,
-
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbonTestTable"));
+
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbontesttable"));
}
@Test public void
testGetCarbonTableReturingProperTableWithProperDatabaseName() {
@@ -92,7 +93,7 @@ public class CarbonMetadataTest {
}
@Test public void
testGetCarbonTableReturingProperTableWithProperFactTableName() {
- String expectedResult = "carbonTestTable";
+ String expectedResult = "carbontesttable";
assertEquals(expectedResult,
carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
}
diff --git
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 897c3cf..f03e193 100644
---
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -1704,7 +1704,7 @@ public class ThriftWrapperSchemaConverterImplTest {
@Test public void testFromExternalToWrapperTableSchema() {
String tableId = "1";
- String tableName = "tableName";
+ String tableName = "tablename";
TableSchema actualResult =
thriftWrapperSchemaConverter.fromExternalToWrapperTableSchema(tabSchema,
"tableName");
assertEquals(tableId, actualResult.getTableId());
@@ -1729,7 +1729,7 @@ public class ThriftWrapperSchemaConverterImplTest {
TableInfo actualResult = thriftWrapperSchemaConverter
.fromExternalToWrapperTableInfo(externalTableInfo, "dbName",
"tableName", "/path");
assertEquals(time, actualResult.getLastUpdatedTime());
- assertEquals("dbName_tableName", actualResult.getTableUniqueName());
+ assertEquals("dbname_tablename", actualResult.getTableUniqueName());
}
}
\ No newline at end of file
diff --git
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index ec1303f..0f9e252 100644
---
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -44,11 +44,11 @@ public class CarbonTableTest extends TestCase {
}
@Test public void testNumberOfDimensionReturnsProperCount() {
- assertEquals(1, carbonTable.getNumberOfDimensions("carbonTestTable"));
+ assertEquals(1, carbonTable.getNumberOfDimensions("carbontesttable"));
}
@Test public void testNumberOfMeasureReturnsProperCount() {
- assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
+ assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}
@Test public void testGetDatabaseNameResturnsDatabaseName() {
@@ -56,7 +56,7 @@ public class CarbonTableTest extends TestCase {
}
@Test public void testFactTableNameReturnsProperFactTableName() {
- assertEquals("carbonTestTable", carbonTable.getTableName());
+ assertEquals("carbontesttable", carbonTable.getTableName());
}
@Test public void testTableUniqueNameIsProper() {
@@ -65,7 +65,7 @@ public class CarbonTableTest extends TestCase {
@Test public void testDimensionPresentInTableIsProper() {
CarbonDimension dimension = new
CarbonDimension(getColumnarDimensionColumn(), 0, -1, -1);
- assertTrue(carbonTable.getDimensionByName("carbonTestTable",
"IMEI").equals(dimension));
+ assertTrue(carbonTable.getDimensionByName("carbontesttable",
"IMEI").equals(dimension));
}
static ColumnSchema getColumnarDimensionColumn() {
diff --git
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index d3403d5..0d0d4df 100644
---
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -43,11 +43,11 @@ public class CarbonTableWithComplexTypesTest extends
TestCase {
}
@Test public void testNumberOfDimensionReturnsProperCount() {
- assertEquals(2, carbonTable.getNumberOfDimensions("carbonTestTable"));
+ assertEquals(2, carbonTable.getNumberOfDimensions("carbontesttable"));
}
@Test public void testNumberOfMeasureReturnsProperCount() {
- assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
+ assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}
@Test public void testGetDatabaseNameResturnsDatabaseName() {
@@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase
{
}
@Test public void testFactTableNameReturnsProperFactTableName() {
- assertEquals("carbonTestTable", carbonTable.getTableName());
+ assertEquals("carbontesttable", carbonTable.getTableName());
}
@Test public void testTableUniqueNameIsProper() {
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
index 37ad08c..236dcfc 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
@@ -261,31 +261,6 @@ class DBLocationCarbonTableTestCase extends QueryTest with
BeforeAndAfterEach {
sql("drop table carbontable")
}
- test("test mdt file path with configured paths") {
- sql(s"create database carbon location '$dblocation'")
- sql("use carbon")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
"/tmp/carbondata1/carbondata2/")
- val (timestampFile, timestampFileType) = getMdtFileAndType()
- FileFactory.deleteFile(timestampFile, timestampFileType)
- sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5
string) STORED BY 'org.apache.carbondata.format'""")
- sql("drop table carbontable")
- // perform file check
- assert(FileFactory.isFileExist(timestampFile, true) ||
-
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
- CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
- val (timestampFile2, timestampFileType2) = getMdtFileAndType()
- FileFactory.deleteFile(timestampFile2, timestampFileType2)
- sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5
string) STORED BY 'org.apache.carbondata.format'""")
- sql("drop table carbontable")
- // perform file check
- assert(FileFactory.isFileExist(timestampFile, true) ||
-
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
- }
-
override def afterEach {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
diff --git
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index febba35..e3f1d3f 100644
---
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -71,7 +71,7 @@ object CarbonSessionUtil {
case _ =>
}
isRelationRefreshed =
-
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+ CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
case _ =>
}
}
@@ -79,12 +79,12 @@ object CarbonSessionUtil {
rtnRelation match {
case SubqueryAlias(_,
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _,
catalogTable)) =>
- isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+ isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _,
catalogTable) =>
- isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+ isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 718bb74..fdaa3d1 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -106,6 +106,11 @@ object IndexServer extends ServerInterface {
sparkSession.sparkContext
.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
}
+ if (!request.getInvalidSegments.isEmpty) {
+ DistributedRDDUtils
+ .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
+ request.getInvalidSegments.asScala)
+ }
val splits = new DistributedPruneRDD(sparkSession, request).collect()
if (!request.isFallbackJob) {
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
@@ -113,11 +118,6 @@ object IndexServer extends ServerInterface {
if (request.isJobToClearDataMaps) {
DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName)
}
- if (!request.getInvalidSegments.isEmpty) {
- DistributedRDDUtils
- .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
- request.getInvalidSegments.asScala)
- }
new ExtendedBlockletWrapperContainer(splits.map(_._2),
request.isFallbackJob)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index e7a6d65..c13e7b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonMetadata}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
@@ -215,38 +215,35 @@ object CarbonEnv {
databaseNameOp: Option[String],
tableName: String)
(sparkSession: SparkSession): CarbonTable = {
- refreshRelationFromCache(TableIdentifier(tableName,
databaseNameOp))(sparkSession)
- val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
val catalog = getInstance(sparkSession).carbonMetaStore
- // refresh cache
- catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
databaseNameOp))
-
- // try to get it from catch, otherwise lookup in catalog
- catalog.getTableFromMetadataCache(databaseName, tableName)
- .getOrElse(
- catalog
- .lookupRelation(databaseNameOp, tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- .carbonTable)
+ // if relation is not refreshed of the table does not exist in cache then
+ if (isRefreshRequired(TableIdentifier(tableName,
databaseNameOp))(sparkSession)) {
+ catalog
+ .lookupRelation(databaseNameOp, tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+ } else {
+
CarbonMetadata.getInstance().getCarbonTable(databaseNameOp.getOrElse(sparkSession
+ .catalog.currentDatabase), tableName)
+ }
}
- def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession:
SparkSession): Boolean = {
- var isRefreshed = false
+ /**
+ *
+ * @return true is the relation was changes and was removed from cache.
false is there is no
+ * change in the relation.
+ */
+ def isRefreshRequired(identifier: TableIdentifier)(sparkSession:
SparkSession): Boolean = {
val carbonEnv = getInstance(sparkSession)
- val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache(
-
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
- identifier.table)
- if (carbonEnv.carbonMetaStore
- .checkSchemasModifiedTimeAndReloadTable(identifier) &&
table.isDefined) {
- sparkSession.sessionState.catalog.refreshTable(identifier)
- val tablePath = table.get.getTablePath
- DataMapStoreManager.getInstance().
- clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
+ val databaseName =
identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val table = CarbonMetadata.getInstance().getCarbonTable(databaseName,
identifier.table)
+ if (table == null) {
+ true
+ } else {
+
carbonEnv.carbonMetaStore.isSchemaRefreshed(AbsoluteTableIdentifier.from(table.getTablePath,
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
- identifier.table, table.get.getTableInfo.getFactTable.getTableId))
- isRefreshed = true
+ identifier.table, table.getTableInfo.getFactTable.getTableId),
sparkSession)
}
- isRefreshed
}
/**
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index b4e60fb..1fa1337 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -67,7 +67,6 @@ case class CarbonDropDataMapCommand(
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetaStore
val tablePath = CarbonEnv.getTablePath(databaseNameOp,
tableName)(sparkSession)
-
catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
if (mainTable == null) {
mainTable = try {
CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 6a9ac0a..cebf606 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -105,8 +105,6 @@ case class RefreshCarbonTableCommand(
}
}
}
- // update the schema modified time
- metaStore.updateAndTouchSchemasUpdatedTime()
Seq.empty
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 566e44e..507fe02 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -60,14 +60,8 @@ case class CarbonAlterTableDropPartitionCommand(
setAuditTable(dbName, tableName)
setAuditInfo(Map("partition" -> model.partitionId))
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val relation = carbonMetaStore.lookupRelation(Option(dbName),
tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val tablePath = relation.carbonTable.getTablePath
-
carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
- if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName,
tableName) == null) {
- throwMetadataException(dbName, tableName, "table not found")
- }
- val carbonTable = relation.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Option(dbName),
tableName)(sparkSession)
+ val tablePath = carbonTable.getTablePath
val partitionInfo = carbonTable.getPartitionInfo(tableName)
if (partitionInfo == null) {
throwMetadataException(dbName, tableName, "table is not a partition
table")
@@ -116,8 +110,6 @@ case class CarbonAlterTableDropPartitionCommand(
thriftTable,
null,
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime()
// sparkSession.catalog.refreshTable(tableName)
Seq.empty
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 72c3142..36ddce4 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -69,7 +69,6 @@ case class CarbonAlterTableSplitPartitionCommand(
if (relation == null) {
throwMetadataException(dbName, tableName, "table not found")
}
-
carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
if (null == (CarbonEnv.getCarbonTable(Some(dbName),
tableName)(sparkSession))) {
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
throwMetadataException(dbName, tableName, "table not found")
@@ -107,8 +106,6 @@ case class CarbonAlterTableSplitPartitionCommand(
thriftTable,
null,
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime()
Seq.empty
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index db52361..480d1f7 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -471,7 +471,7 @@ object PreAggregateUtil {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetaStore
- .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
+ .updateTableSchema(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
thriftTable,
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
@@ -539,23 +539,6 @@ object PreAggregateUtil {
}
}
- def getChildCarbonTable(databaseName: String, tableName: String)
- (sparkSession: SparkSession): Option[CarbonTable] = {
- val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val carbonTable = metaStore.getTableFromMetadataCache(databaseName,
tableName)
- if (carbonTable.isEmpty) {
- try {
- Some(metaStore.lookupRelation(Some(databaseName),
tableName)(sparkSession)
- .asInstanceOf[CarbonRelation].metaData.carbonTable)
- } catch {
- case _: Exception =>
- None
- }
- } else {
- carbonTable
- }
- }
-
/**
* Below method will be used to update logical plan
* this is required for creating pre aggregate tables,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index ff0177b..6b80bbe 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
+import org.apache.spark.sql.hive.CarbonFileMetastore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -183,6 +184,18 @@ case class CarbonDropTableCommand(
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePostEvent,
operationContext)
+ // Remove all invalid entries of carbonTable and corresponding updated
timestamp
+ // values from the cache. This case is valid when there are 2 JDBCServer
and one of them
+ // drops the table, the other server would not be able to clear its
cache.
+ try {
+ CarbonEnv.getInstance(sparkSession).carbonMetaStore match {
+ case metastore: CarbonFileMetastore =>
metastore.removeStaleTimeStampEntries(sparkSession)
+ case _ =>
+ }
+ } catch {
+ case _: Exception =>
+ // Do nothing
+ }
} catch {
case ex: NoSuchTableException =>
if (!ifExistsSet) {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index ea3bba8..7ab2d47 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.net.URI
+import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv,
SparkSession}
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias =>
SubqueryAlias}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -36,7 +38,6 @@ import org.apache.spark.util.{CarbonReflectionUtils,
SparkUtil}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -87,59 +88,95 @@ object MatchLogicalRelation {
}
}
-class CarbonFileMetastore extends CarbonMetaStore {
+private object CarbonFileMetastore {
+
+ final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
+
+ def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
+ localTimeStamp: Long): Boolean = synchronized {
+ val schemaFilePath =
CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath)
+ val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath)
+ if (schemaCarbonFile.exists()) {
+ val oldTime =
Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableId))
+ val newTime = schemaCarbonFile.getLastModifiedTime
+ val isSchemaModified = oldTime match {
+ case Some(cacheTime) =>
+ cacheTime != newTime
+ case None => true
+ }
+ if (isSchemaModified) {
+ CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier.getTableUniqueName)
+
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+ true
+ } else {
+ localTimeStamp != newTime
+ }
+ } else {
+ false
+ }
+ }
- @transient
- val LOGGER =
LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+ def updateTableSchemaModifiedTime(tableUniqueId: String, timeStamp: Long):
Unit = {
+ tableModifiedTimeStore.put(tableUniqueId, timeStamp)
+ }
- val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
- tableModifiedTimeStore
- .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME,
System.currentTimeMillis())
+ def getTableModifiedTime(tableUniqueId: String): Long = {
+ tableModifiedTimeStore.get(tableUniqueId)
+ }
- def nextQueryId: String = {
- System.nanoTime() + ""
+ def removeStaleEntries(invalidTableUniqueIds: List[String]) {
+ for (invalidKey <- invalidTableUniqueIds) {
+ tableModifiedTimeStore.remove(invalidKey)
+ }
}
+}
- val metadata = MetaData(new ArrayBuffer[CarbonTable]())
+class CarbonFileMetastore extends CarbonMetaStore {
+ @transient private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getName)
+
+ final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
/**
- * Create spark session from paramters.
- *
- * @param parameters
- * @param absIdentifier
- * @param sparkSession
+ * Create Carbon Relation by reading the schema file
*/
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
- val tables = getTableFromMetadataCache(database, tableName)
+ val tables = Option(CarbonMetadata.getInstance.getCarbonTable(database,
tableName))
tables match {
case Some(t) =>
- CarbonRelation(database, tableName,
CarbonSparkUtil.createSparkMeta(t), t)
- case None =>
- readCarbonSchema(absIdentifier,
- !parameters.getOrElse("isTransactional", "true").toBoolean) match {
- case Some(meta) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(meta), meta)
- case None =>
- throw new NoSuchTableException(database, tableName)
+ if (isSchemaRefreshed(absIdentifier, sparkSession)) {
+ readCarbonSchema(absIdentifier, parameters)
+ } else {
+ CarbonRelation(database, tableName,
CarbonSparkUtil.createSparkMeta(t), t)
}
+ case None =>
+ readCarbonSchema(absIdentifier, parameters)
+ }
+ }
+
+ private def readCarbonSchema(absIdentifier: AbsoluteTableIdentifier,
+ parameters: Map[String, String]): CarbonRelation = {
+ readCarbonSchema(absIdentifier,
+ !parameters.getOrElse("isTransactional", "true").toBoolean) match {
+ case Some(meta) =>
+ CarbonRelation(absIdentifier.getDatabaseName,
absIdentifier.getTableName,
+ CarbonSparkUtil.createSparkMeta(meta), meta)
+ case None =>
+ throw new NoSuchTableException(absIdentifier.getDatabaseName,
absIdentifier.getTableName)
}
}
/**
- * This method will overwrite the existing schema and update it with the
given details
- *
- * @param newTableIdentifier
- * @param thriftTableInfo
- * @param carbonStorePath
- * @param sparkSession
+ * This method will overwrite the existing schema and update it with the
given details.
*/
- def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonStorePath: String)(sparkSession: SparkSession): String = {
@@ -174,13 +211,17 @@ class CarbonFileMetastore extends CarbonMetaStore {
catalogTable.provider match {
case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
|| name.equalsIgnoreCase("carbondata")) => name
- case _ => throw new NoSuchTableException(database,
tableIdentifier.table)
+ case _ =>
+ CarbonMetadata.getInstance().removeTable(database,
tableIdentifier.table)
+ throw new NoSuchTableException(database, tableIdentifier.table)
}
val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
catalogTable.location.toString, database, tableIdentifier.table)
CarbonEnv.getInstance(sparkSession).carbonMetaStore.
createCarbonRelation(catalogTable.storage.properties, identifier,
sparkSession)
- case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+ case _ =>
+ CarbonMetadata.getInstance().removeTable(database,
tableIdentifier.table)
+ throw new NoSuchTableException(database, tableIdentifier.table)
}
// fire post event after lookup relation
@@ -193,22 +234,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
relation
}
- /**
- * This method will search for a table in the catalog metadata
- *
- * @param database
- * @param tableName
- * @return
- */
- def getTableFromMetadataCache(database: String, tableName: String):
Option[CarbonTable] = {
- metadata.readLock.lock()
- val ret = metadata.carbonTables
- .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
- table.getTableName.equalsIgnoreCase(tableName))
- metadata.readLock.unlock()
- ret
- }
-
def tableExists(
table: String,
databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean
= {
@@ -226,71 +251,60 @@ class CarbonFileMetastore extends CarbonMetaStore {
true
}
- def isTableInMetastore(identifier: AbsoluteTableIdentifier,
- sparkSession: SparkSession): Boolean = {
- sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
- .exists(_.table.equalsIgnoreCase(identifier.getTableName))
- }
-
-
private def readCarbonSchema(identifier: AbsoluteTableIdentifier,
inferSchema: Boolean): Option[CarbonTable] = {
-
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val tableName = identifier.getCarbonTableIdentifier.getTableName
val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
val tablePath = identifier.getTablePath
+ var schemaRefreshTime = System.currentTimeMillis()
val wrapperTableInfo =
- if (inferSchema) {
- val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName,
tableName)
- val tblInfoFromCache = if (carbonTbl != null) {
- carbonTbl.getTableInfo
- } else {
- null
- }
-
- val thriftTableInfo : TableInfo = if (tblInfoFromCache != null) {
- // In case the TableInfo is present in the Carbon Metadata Cache
- // then get the tableinfo from the cache rather than infering from
- // the CarbonData file.
- schemaConverter
- .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName)
- } else {
- schemaConverter
- .fromWrapperToExternalTableInfo(SchemaReader
- .inferSchema(identifier, false),
- dbName, tableName)
- }
+ if (inferSchema) {
+ val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName,
tableName)
+ val tblInfoFromCache = if (carbonTbl != null) {
+ carbonTbl.getTableInfo
+ } else {
+ null
+ }
- val wrapperTableInfo =
- schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName,
tablePath)
- wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
- wrapperTableInfo.setTransactionalTable(false)
- Some(wrapperTableInfo)
- } else {
- val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
- val fileType = FileFactory.getFileType(tableMetadataFile)
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val thriftTableInfo: TableInfo = if (tblInfoFromCache != null) {
+ // In case the TableInfo is present in the Carbon Metadata Cache
+ // then get the tableinfo from the cache rather than infering from
+ // the CarbonData file.
+ schemaConverter
+ .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName,
tableName)
+ } else {
+ schemaConverter
+ .fromWrapperToExternalTableInfo(SchemaReader
+ .inferSchema(identifier, false),
+ dbName, tableName)
+ }
val wrapperTableInfo =
- schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName,
tableName, tablePath)
+ schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo, dbName,
tableName, tablePath)
+ wrapperTableInfo.getFactTable.getTableProperties.put("_external",
"true")
+ wrapperTableInfo.setTransactionalTable(false)
Some(wrapperTableInfo)
} else {
- None
+ val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+ schemaRefreshTime = FileFactory
+
.getCarbonFile(CarbonTablePath.getSchemaFilePath(tablePath)).getLastModifiedTime
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableInfo: TableInfo =
CarbonUtil.readSchemaFile(tableMetadataFile)
+ val wrapperTableInfo =
+ schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName,
tableName, tablePath)
+ Some(wrapperTableInfo)
+ } else {
+ None
+ }
}
- }
-
-
wrapperTableInfo.map { tableInfo =>
+ updateSchemasUpdatedTime(tableInfo.getFactTable.getTableId,
schemaRefreshTime)
CarbonMetadata.getInstance().removeTable(tableUniqueName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val carbonTable =
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- metadata.writeLock.lock()
- metadata.carbonTables += carbonTable
- metadata.writeLock.unlock()
- carbonTable
+ CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
}
}
@@ -319,17 +333,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
newTableIdentifier.getTableName,
oldTableIdentifier.getTableId)
val path = createSchemaThriftFile(newAbsoluteTableIdentifier,
thriftTableInfo)
- addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
-
+ addCarbonTableToCache(wrapperTableInfo, newAbsoluteTableIdentifier)
path
}
/**
* This method will is used to remove the evolution entry in case of failure.
- *
- * @param carbonTableIdentifier
- * @param thriftTableInfo
- * @param sparkSession
*/
def revertTableSchemaInAlterFailure(carbonTableIdentifier:
CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
@@ -343,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val evolutionEntries =
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
- addTableCache(wrapperTableInfo, absoluteTableIdentifier)
+ addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier)
path
}
@@ -360,7 +369,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val childSchemaList = wrapperTableInfo.getDataMapSchemaList
childSchemaList.remove(childSchemaList.size() - 1)
val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
- addTableCache(wrapperTableInfo, absoluteTableIdentifier)
+ addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier)
path
}
@@ -393,7 +402,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
removeTableFromMetadata(tableInfo.getDatabaseName,
tableInfo.getFactTable.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- addTableCache(tableInfo, absoluteTableIdentifier)
+ addCarbonTableToCache(tableInfo, absoluteTableIdentifier)
CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
}
@@ -417,61 +426,27 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- updateSchemasUpdatedTime(touchSchemaFileSystemTime())
+ val modifiedTime = System.currentTimeMillis()
+ FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime)
+ updateSchemasUpdatedTime(identifier.getCarbonTableIdentifier.getTableId,
modifiedTime)
identifier.getTablePath
}
- protected def addTableCache(
+ protected def addCarbonTableToCache(
tableInfo: table.TableInfo,
- absoluteTableIdentifier: AbsoluteTableIdentifier):
ArrayBuffer[CarbonTable] = {
+ absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = {
val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
- CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
removeTableFromMetadata(identifier.getDatabaseName,
identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- metadata.writeLock.lock()
- metadata.carbonTables +=
-
CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
- metadata.writeLock.unlock()
- metadata.carbonTables
}
/**
- * This method will remove the table meta from catalog metadata array
- *
- * @param dbName
- * @param tableName
+ * This method will remove the table meta from CarbonMetadata cache.
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val carbonTableToBeRemoved: Option[CarbonTable] =
getTableFromMetadataCache(dbName, tableName)
- carbonTableToBeRemoved match {
- case Some(carbonTable) =>
- metadata.writeLock.lock()
- metadata.carbonTables -= carbonTable
- metadata.writeLock.unlock()
- case None =>
- if (LOGGER.isDebugEnabled) {
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
- }
- }
CarbonMetadata.getInstance.removeTable(dbName, tableName)
}
- private def updateMetadataByWrapperTable(
- wrapperTableInfo:
org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
-
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- wrapperTableInfo.getTableUniqueName)
- for (i <- metadata.carbonTables.indices) {
- metadata.writeLock.lock()
- if (wrapperTableInfo.getTableUniqueName.equals(
- metadata.carbonTables(i).getTableUniqueName)) {
- metadata.carbonTables(i) = carbonTable
- }
- metadata.writeLock.unlock()
- }
- }
-
def updateMetadataByThriftTable(schemaFilePath: String,
tableInfo: TableInfo, dbName: String, tableName: String, tablePath:
String): Unit = {
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
@@ -479,7 +454,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo =
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName,
tableName, tablePath)
- updateMetadataByWrapperTable(wrapperTableInfo)
+ addCarbonTableToCache(wrapperTableInfo,
+ wrapperTableInfo.getOrCreateAbsoluteTableIdentifier())
}
@@ -496,133 +472,65 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
- def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
- (sparkSession: SparkSession) {
+ def dropTable(absoluteTableIdentifier:
AbsoluteTableIdentifier)(sparkSession: SparkSession) {
val dbName =
absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
val tableName =
absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
- val metadataFilePath =
CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName,
tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
- val fileType = FileFactory.getFileType(metadataFilePath)
-
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- // while drop we should refresh the schema modified time so that if any
thing has changed
- // in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
-
- CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName,
sparkSession)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime())
- // discard cached table info in cachedDataSourceTables
- val tableIdentifier = TableIdentifier(tableName, Option(dbName))
- sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
- DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
-
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
- removeTableFromMetadata(dbName, tableName)
- } else {
- if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
- removeTableFromMetadata(dbName, tableName)
- CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName,
sparkSession)
- // discard cached table info in cachedDataSourceTables
- val tableIdentifier = TableIdentifier(tableName, Option(dbName))
- sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
-
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
- removeTableFromMetadata(dbName, tableName)
- }
- }
+ CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName,
sparkSession)
+ // discard cached table info in cachedDataSourceTables
+ val tableIdentifier = TableIdentifier(tableName, Option(dbName))
+ sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+ DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+ removeTableFromMetadata(dbName, tableName)
}
def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean
= {
- val table = getTableFromMetadataCache(identifier.getDatabaseName,
identifier.getTableName)
- table.map(_.getTableInfo.isTransactionalTable).getOrElse(true)
- }
-
- private def getTimestampFileAndType() = {
- var basePath = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
- CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
- basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath)
- val timestampFile = basePath + "/" +
CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
- val timestampFileType = FileFactory.getFileType(timestampFile)
- if (!FileFactory.isFileExist(basePath, timestampFileType)) {
- FileFactory
- .createDirectoryAndSetPermission(basePath,
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ val table = Option(CarbonMetadata.getInstance()
+ .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName))
+ table match {
+ case Some(t) => t.isTransactionalTable
+ case None => true
}
- (timestampFile, timestampFileType)
}
/**
* This method will put the updated timestamp of schema file in the table
modified time store map
- *
- * @param timeStamp
*/
- private def updateSchemasUpdatedTime(timeStamp: Long) {
- tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME,
timeStamp)
- }
-
- def updateAndTouchSchemasUpdatedTime() {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime())
+ private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long)
{
+ tableModifiedTimeStore.put(tableUniqueId, timeStamp)
+ CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp)
}
- /**
- * This method will check and create an empty schema timestamp file
- *
- * @return
- */
- private def touchSchemaFileSystemTime(): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType()
- if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.info(s"Creating timestamp file for $timestampFile")
- FileFactory
- .createNewFile(timestampFile,
- timestampFileType,
- true,
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
- }
- FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .setLastModifiedTime(System.currentTimeMillis())
- // since there is no guarantee that exact same set modified time returns
when called
- // lastmodified time, so better get the time from file.
- FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .getLastModifiedTime
- }
-
- def checkSchemasModifiedTimeAndReloadTable(tableIdentifier:
TableIdentifier): Boolean = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType()
- var isRefreshed = false
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- val lastModifiedTime =
- FileFactory.getCarbonFile(timestampFile,
timestampFileType).getLastModifiedTime
- if (!(lastModifiedTime ==
-
tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
- metadata.writeLock.lock()
- metadata.carbonTables = metadata.carbonTables.filterNot(
- table => table.getTableName.equalsIgnoreCase(tableIdentifier.table)
&&
- table.getDatabaseName
- .equalsIgnoreCase(tableIdentifier.database
-
.getOrElse(SparkSession.getActiveSession.get.sessionState.catalog
- .getCurrentDatabase)))
- metadata.writeLock.unlock()
- updateSchemasUpdatedTime(lastModifiedTime)
- isRefreshed = true
+ override def isSchemaRefreshed(absoluteTableIdentifier:
AbsoluteTableIdentifier,
+ sparkSession: SparkSession): Boolean = {
+ val localTimeStamp =
Option(tableModifiedTimeStore.get(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableId))
+ if (localTimeStamp.isDefined) {
+ if (CarbonFileMetastore.checkIfRefreshIsNeeded(absoluteTableIdentifier,
localTimeStamp.get)) {
+ sparkSession.sessionState
+
.catalog.refreshTable(TableIdentifier(absoluteTableIdentifier.getTableName,
+ Some(absoluteTableIdentifier.getDatabaseName)))
+ true
+ } else {
+ false
}
+ } else {
+ true
}
- isRefreshed
}
override def isReadFromHiveMetaStore: Boolean = false
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
- metadata.readLock.lock
- val ret = metadata.carbonTables.clone()
- metadata.readLock.unlock
- ret
+ CarbonMetadata.getInstance().getAllTables.asScala
}
@@ -672,4 +580,21 @@ class CarbonFileMetastore extends CarbonMetaStore {
case _ => throw new NoSuchTableException(tableIdentifier.database.get,
tableIdentifier.table)
}
}
+
+ def removeStaleTimeStampEntries(sparkSession: SparkSession): Unit = {
+ val tablesList = sparkSession.sessionState.catalog.listDatabases().flatMap
{
+ database =>
+ sparkSession.sessionState.catalog.listTables(database)
+ .map(table => s"${ database }_${ table.table
}".toLowerCase(Locale.getDefault()))
+ }
+ val invalidTableIds =
CarbonMetadata.getInstance().getAllTables.asScala.collect {
+ case carbonTable if !tablesList.contains(carbonTable.getTableUniqueName
+ .toLowerCase(Locale.getDefault())) =>
+ CarbonMetadata.getInstance()
+
.removeTable(carbonTable.getTableUniqueName.toLowerCase(Locale.getDefault()))
+ carbonTable.getTableId
+ }
+ CarbonFileMetastore.removeStaleEntries(invalidTableIds.toList)
+ }
+
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index c8c7d31..a0a8c0f 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -63,6 +63,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
carbonRelation
}
+ override def isSchemaRefreshed(absoluteTableIdentifier:
AbsoluteTableIdentifier,
+ sparkSession: SparkSession): Boolean = true
override def isTablePathExists(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): Boolean = {
@@ -78,7 +80,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName,
sparkSession)
// discard cached table info in cachedDataSourceTables
val tableIdentifier = TableIdentifier(tableName, Option(dbName))
@@ -88,11 +89,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
removeTableFromMetadata(dbName, tableName)
}
- override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier:
TableIdentifier): Boolean = {
- // do nothing
- false
- }
-
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
// Todo
Seq()
@@ -140,7 +136,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
* @param carbonTablePath
* @param sparkSession
*/
- override def updateTableSchemaForDataMap(newTableIdentifier:
CarbonTableIdentifier,
+ override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonTablePath: String)(sparkSession: SparkSession): String = {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index f97a8ae..95efa6a 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -82,7 +82,7 @@ trait CarbonMetaStore {
* @param carbonStorePath
* @param sparkSession
*/
- def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonStorePath: String)(sparkSession: SparkSession): String
@@ -135,9 +135,8 @@ trait CarbonMetaStore {
def dropTable(tableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession)
- def updateAndTouchSchemasUpdatedTime()
-
- def checkSchemasModifiedTimeAndReloadTable(tableIdentifier:
TableIdentifier): Boolean
+ def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier,
+ sparkSession: SparkSession): Boolean
def isReadFromHiveMetaStore: Boolean
@@ -147,8 +146,6 @@ trait CarbonMetaStore {
carbonTable: CarbonTable
): org.apache.carbondata.format.TableInfo
- def getTableFromMetadataCache(database: String, tableName: String):
Option[CarbonTable]
-
/**
* Method will be used to retrieve or create carbon data source relation
*
@@ -173,6 +170,7 @@ trait CarbonMetaStore {
val df: DataFrame = Dataset.ofRows(sparkSession, query)
df.schema
}
+
}
/**
* Factory for Carbon metastore
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index cb3ae29..a9cb652 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -69,9 +69,6 @@ object CleanFiles {
forceTableClean = args(2).toBoolean
}
val spark = TableAPIUtil.spark(storePath, s"CleanFiles:
$dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
-
cleanFiles(spark, dbName, tableName, forceTableClean)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 0a3a870..91203a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -58,8 +58,6 @@ object Compaction {
val (dbName, tableName) =
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction:
$dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
compaction(spark, dbName, tableName, compactionType)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 90a37f6..6149421 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -45,8 +45,6 @@ object DeleteSegmentByDate {
val (dbName, tableName) =
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate:
$dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 15bec02..023c7bf 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -50,8 +50,6 @@ object DeleteSegmentById {
val (dbName, tableName) =
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById:
$dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index efaa191..ef9a931 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -82,8 +82,6 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader:
$dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
Some(dbName)))
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
diff --git
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 26f778e..75e7d89 100644
---
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -125,9 +125,9 @@ class CarbonHiveSessionCatalog(
rtnRelation match {
case SubqueryAlias(_,
LogicalRelation(carbonDatasourceHadoopRelation:
CarbonDatasourceHadoopRelation, _, _), _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias,
carbonDatasourceHadoopRelation)
- case LogicalRelation(carbonDatasourceHadoopRelation:
CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias,
carbonDatasourceHadoopRelation)
+ toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+ case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
case _ =>
}
@@ -138,31 +138,6 @@ class CarbonHiveSessionCatalog(
}
}
- private def refreshRelationFromCache(identifier: TableIdentifier,
- alias: Option[String],
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean
= {
- var isRefreshed = false
- val storePath = CarbonProperties.getStorePath
- carbonEnv.carbonMetaStore.
- checkSchemasModifiedTimeAndReloadTable(identifier)
-
- val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache(
- carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getTableName)
- if (table.isEmpty || (table.isDefined &&
- table.get.getTableLastUpdatedTime !=
- carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime))
{
- refreshTable(identifier)
- DataMapStoreManager.getInstance().
- clearDataMaps(AbsoluteTableIdentifier.from(storePath,
- identifier.database.getOrElse("default"),
- identifier.table))
- isRefreshed = true
- logInfo(s"Schema changes have been detected for table: $identifier")
- }
- isRefreshed
- }
-
/**
* returns hive client from session state
*
diff --git
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index dd1fa0f..d368a8e 100644
---
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -133,10 +133,10 @@ class AlterTableColumnRenameTestCase extends
Spark2QueryTest with BeforeAndAfter
sql("alter table rename change empname name string")
sql("update rename set (name) = ('joey') where workgroupcategory =
'developer'").show()
sql("insert into rename select
20,'bill','PM','01-12-2015',3,'manager',14,'Learning',928479,'01-01-2016','30-11-2016',75,94,13547")
- val df1 = sql("select * from rename where name = 'joey'")
+ val df1Count = sql("select * from rename where name = 'joey'").count
sql("alter table rename change name empname string")
val df2 = sql("select * from rename where empname = 'joey'")
- assert(df1.count() == df2.count())
+ assert(df1Count == df2.count())
sql("delete from rename where empname = 'joey'")
val df3 = sql("select empname from rename")
sql("alter table rename change empname newname string")