[CARBONDATA-2026] Fix all issues and testcases when enabling carbon hive metastore
This closes #1799 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7504a5c5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7504a5c5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7504a5c5 Branch: refs/heads/carbonstore Commit: 7504a5c51fbebb682d4917e91e927933b113280a Parents: 7b5db58 Author: ravipesala <[email protected]> Authored: Sun Jan 14 18:59:09 2018 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Wed Jan 17 14:53:46 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 8 + .../core/metadata/schema/table/TableSchema.java | 12 +- .../apache/carbondata/core/util/CarbonUtil.java | 36 ++-- .../hadoop/api/CarbonTableInputFormat.java | 4 +- .../hadoop/util/ObjectSerializationUtil.java | 21 +-- integration/spark-common-cluster-test/pom.xml | 2 + .../sdv/register/TestRegisterCarbonTable.scala | 145 +++++++------- integration/spark-common-test/pom.xml | 2 + .../DBLocationCarbonTableTestCase.scala | 8 +- .../StandardPartitionTableLoadingTestCase.scala | 12 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 2 +- .../spark/rdd/CarbonIUDMergerRDD.scala | 3 + .../spark/rdd/CarbonScanPartitionRDD.scala | 4 +- .../command/carbonTableSchemaCommon.scala | 9 + .../org/apache/spark/util/PartitionUtils.scala | 19 +- integration/spark2/pom.xml | 2 + .../CarbonAlterTableDropPartitionCommand.scala | 9 +- .../CarbonAlterTableSplitPartitionCommand.scala | 10 +- .../preaaggregate/PreAggregateListeners.scala | 3 +- .../preaaggregate/PreAggregateUtil.scala | 13 ++ .../spark/sql/hive/CarbonFileMetastore.scala | 2 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 6 +- .../apache/spark/sql/hive/CarbonMetaStore.scala | 5 + .../sql/hive/CarbonPreAggregateRules.scala | 2 +- .../register/TestRegisterCarbonTable.scala | 188 ++++++++++--------- .../AlterTableValidationTestCase.scala | 2 +- pom.xml | 1 + .../carbondata/streaming/StreamHandoffRDD.scala | 1 + .../streaming/StreamSinkFactory.scala | 9 +- 29 files changed, 334 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 74dfef6..dd65b84 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -163,6 +163,14 @@ public class CarbonTable implements Serializable { columnSchema.getPrecision(), columnSchema.getScale())); } } + if (tableInfo.getFactTable().getPartitionInfo() != null) { + for (ColumnSchema columnSchema : tableInfo.getFactTable().getPartitionInfo() + .getColumnSchemaList()) { + columnSchema.setDataType(DataTypeUtil + .valueOf(columnSchema.getDataType(), columnSchema.getPrecision(), + columnSchema.getScale())); + } + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java index f03e8b6..8fdfbab 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.SchemaEvolution; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonUtil; /** * Persisting the table information @@ -272,15 +274,21 @@ public class TableSchema implements Serializable, Writable { * */ public DataMapSchema buildChildSchema(String dataMapName, String className, String databaseName, - String queryString, String queryType) { + String queryString, String queryType) throws UnsupportedEncodingException { RelationIdentifier relationIdentifier = new RelationIdentifier(databaseName, tableName, tableId); Map<String, String> properties = new HashMap<>(); - properties.put("CHILD_SELECT QUERY", queryString); + properties.put("CHILD_SELECT QUERY", + CarbonUtil.encodeToString( + queryString.trim().getBytes( + // replace = to with & as hive metastore does not allow = inside. For base 64 + // only = is allowed as special character , so replace with & + CarbonCommonConstants.DEFAULT_CHARSET)).replace("=","&")); properties.put("QUERYTYPE", queryType); DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className); dataMapSchema.setProperties(properties); + dataMapSchema.setChildSchema(this); dataMapSchema.setRelationIdentifier(relationIdentifier); return dataMapSchema; http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 0839ae9..e272932 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.ObjectInputStream; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -85,14 +86,13 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.format.BlockletHeader; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; import com.google.gson.Gson; import com.google.gson.GsonBuilder; - +import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -2032,16 +2032,6 @@ public final class CarbonUtil { return tableInfo; } - public static void writeThriftTableToSchemaFile(String schemaFilePath, - org.apache.carbondata.format.TableInfo tableInfo) throws IOException { - ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); - try { - thriftWriter.open(); - thriftWriter.write(tableInfo); - } finally { - thriftWriter.close(); - } - } public static void dropDatabaseDirectory(String databasePath) throws IOException, InterruptedException { @@ -2336,5 +2326,27 @@ public final class CarbonUtil { return false; } + /** + * Convert the bytes to base64 encode string + * @param bytes + * @return + * @throws UnsupportedEncodingException + */ + public static String encodeToString(byte[] bytes) throws UnsupportedEncodingException { + return new String(Base64.encodeBase64(bytes), + CarbonCommonConstants.DEFAULT_CHARSET); + } + + /** + * Deoce + * @param objectString + * @return + * @throws UnsupportedEncodingException + */ + public static byte[] decodeStringToBytes(String objectString) + throws UnsupportedEncodingException { + return Base64.decodeBase64(objectString.getBytes(CarbonCommonConstants.DEFAULT_CHARSET)); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 0fb17c1..36c5f57 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -140,7 +140,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { public static void setTableInfo(Configuration configuration, TableInfo tableInfo) throws IOException { if (null != tableInfo) { - configuration.set(TABLE_INFO, ObjectSerializationUtil.encodeToString(tableInfo.serialize())); + configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); } } @@ -155,7 +155,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { TableInfo output = new TableInfo(); output.readFields( new DataInputStream( - new ByteArrayInputStream(ObjectSerializationUtil.decodeStringToBytes(tableInfoStr)))); + new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); return output; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java index ea3246f..d97df2d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java @@ -16,13 +16,16 @@ */ package org.apache.carbondata.hadoop.util; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,13 +69,9 @@ public class ObjectSerializationUtil { } } - return encodeToString(baos.toByteArray()); + return CarbonUtil.encodeToString(baos.toByteArray()); } - public static String encodeToString(byte[] bytes) throws UnsupportedEncodingException { - return new String(Base64.encodeBase64(bytes), - CarbonCommonConstants.DEFAULT_CHARSET); - } /** * Converts Base64 string to object. @@ -86,7 +85,7 @@ public class ObjectSerializationUtil { return null; } - byte[] bytes = decodeStringToBytes(objectString); + byte[] bytes = CarbonUtil.decodeStringToBytes(objectString); ByteArrayInputStream bais = null; GZIPInputStream gis = null; @@ -116,8 +115,4 @@ public class ObjectSerializationUtil { } } - public static byte[] decodeStringToBytes(String objectString) - throws UnsupportedEncodingException { - return Base64.decodeBase64(objectString.getBytes(CarbonCommonConstants.DEFAULT_CHARSET)); - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common-cluster-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml index 95a719c..8af8b14 100644 --- a/integration/spark-common-cluster-test/pom.xml +++ b/integration/spark-common-cluster-test/pom.xml @@ -122,6 +122,7 @@ <argLine>-Xmx6g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> <systemProperties> <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> <failIfNoTests>false</failIfNoTests> </configuration> @@ -145,6 +146,7 @@ <java.awt.headless>true</java.awt.headless> <spark.master.url>${spark.master.url}</spark.master.url> <hdfs.url>${hdfs.url}</hdfs.url> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala index 45e3e50..e3620f7 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala @@ -19,11 +19,10 @@ package org.apache.carbondata.cluster.sdv.register import java.io.IOException import org.scalatest.BeforeAndAfterAll - import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -83,10 +82,12 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("use carbon") sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } checkAnswer(sql("select count(*) from carbontable"), Row(1)) checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) } @@ -97,10 +98,12 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("use carbon") sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } checkAnswer(sql("select count(*) from carbontable"), Row(1)) checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) } @@ -114,17 +117,19 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'aa','aaa'") sql("insert into carbontable select 'a',10,'aa','aaa'") sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") - backUpData(dbLocationCustom, "carbontable") - backUpData(dbLocationCustom, "carbontable_preagg1") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - restoreData(dbLocationCustom, "carbontable_preagg1") - sql("refresh table carbontable") - checkAnswer(sql("select count(*) from carbontable"), Row(3)) - checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) - checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) - checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) - } + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + backUpData(dbLocationCustom, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + restoreData(dbLocationCustom, "carbontable_preagg1") + sql("refresh table carbontable") + } + checkAnswer(sql("select count(*) from carbontable"), Row(3)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) + checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) + checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + } test("register pre aggregate table test") { sql("drop database if exists carbon cascade") @@ -135,12 +140,14 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'aa','aaa'") sql("insert into carbontable select 'a',10,'aa','aaa'") sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") - backUpData(dbLocationCustom, "carbontable") - backUpData(dbLocationCustom, "carbontable_preagg1") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - restoreData(dbLocationCustom, "carbontable_preagg1") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + backUpData(dbLocationCustom, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + restoreData(dbLocationCustom, "carbontable_preagg1") + sql("refresh table carbontable") + } checkAnswer(sql("select count(*) from carbontable"), Row(3)) checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) @@ -156,18 +163,20 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'aa','aaa'") sql("insert into carbontable select 'a',10,'aa','aaa'") sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") - backUpData(dbLocationCustom, "carbontable") - backUpData(dbLocationCustom, "carbontable_preagg1") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - try { - sql("refresh table carbontable") - assert(false) - } catch { - case e : AnalysisException => - assert(true) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + backUpData(dbLocationCustom, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + try { + sql("refresh table carbontable") + assert(false) + } catch { + case e: AnalysisException => + assert(true) + } + restoreData(dbLocationCustom, "carbontable_preagg1") } - restoreData(dbLocationCustom, "carbontable_preagg1") } test("Update operation on carbon table should pass after registration or refresh") { @@ -177,16 +186,18 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") sql("insert into carbontable select 'b',1,'bb','bbb'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } // update operation sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show() checkAnswer( sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb")) + Seq(Row("a", 2, "aa", "aaa"), Row("b", 2, "bb", "bbb")) ) } @@ -197,15 +208,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") sql("insert into carbontable select 'b',1,'bb','bbb'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } // delete operation sql("""delete from carbontable where c3 = 'aa'""").show checkAnswer( sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("b",1,"bb","bbb")) + Seq(Row("b", 1, "bb", "bbb")) ) sql("drop table carbontable") } @@ -217,15 +230,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") sql("insert into carbontable select 'b',1,'bb','bbb'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } sql("Alter table carbontable add columns(c4 string) " + "TBLPROPERTIES('DICTIONARY_EXCLUDE'='c4', 'DEFAULT.VALUE.c4'='def')") checkAnswer( sql("""select c1,c2,c3,c5,c4 from carbon.carbontable"""), - Seq(Row("a",1,"aa","aaa","def"), Row("b",1,"bb","bbb","def")) + Seq(Row("a", 1, "aa", "aaa", "def"), Row("b", 1, "bb", "bbb", "def")) ) sql("drop table carbontable") } @@ -237,14 +252,16 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") sql("insert into carbontable select 'b',1,'bb','bbb'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } sql("Alter table carbontable change c2 c2 long") checkAnswer( sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("a",1,"aa","aaa"), Row("b",1,"bb","bbb")) + Seq(Row("a", 1, "aa", "aaa"), Row("b", 1, "bb", "bbb")) ) sql("drop table carbontable") } @@ -256,14 +273,16 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("insert into carbontable select 'a',1,'aa','aaa'") sql("insert into carbontable select 'b',1,'bb','bbb'") - backUpData(dbLocationCustom, "carbontable") - sql("drop table carbontable") - restoreData(dbLocationCustom, "carbontable") - sql("refresh table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + backUpData(dbLocationCustom, "carbontable") + sql("drop table carbontable") + restoreData(dbLocationCustom, "carbontable") + sql("refresh table carbontable") + } sql("Alter table carbontable drop columns(c2)") checkAnswer( sql("""select * from carbon.carbontable"""), - Seq(Row("a","aa","aaa"), Row("b","bb","bbb")) + Seq(Row("a", "aa", "aaa"), Row("b", "bb", "bbb")) ) sql("drop table carbontable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index 2580b95..b8d7c05 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -173,6 +173,7 @@ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> <systemProperties> <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> <failIfNoTests>false</failIfNoTests> </configuration> @@ -193,6 +194,7 @@ </environmentVariables> <systemProperties> <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala index 8e8935f..eb26276 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.dblocation import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -201,7 +201,8 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("drop table carbontable") // perform file check - assert(FileFactory.isFileExist(timestampFile, timestampFileType, true)) + assert(FileFactory.isFileExist(timestampFile, timestampFileType, true) || + CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, @@ -211,7 +212,8 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql("drop table carbontable") // perform file check - assert(FileFactory.isFileExist(timestampFile, timestampFileType, true)) + assert(FileFactory.isFileExist(timestampFile, timestampFileType, true) || + CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) } override def afterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 25e73c4..31d2598 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -22,7 +22,7 @@ import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -391,10 +391,12 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/")) backUpData(dblocation, "restorepartition") sql("drop table restorepartition") - restoreData(dblocation, "restorepartition") - sql("refresh table restorepartition") - checkAnswer(sql("select count(*) from restorepartition"), rows) - checkAnswer(sql("show partitions restorepartition"), partitions) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "restorepartition") + sql("refresh table restorepartition") + checkAnswer(sql("select count(*) from restorepartition"), rows) + checkAnswer(sql("show partitions restorepartition"), partitions) + } } test("test case sensitive on partition columns") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 1ecab9f..35a8ea7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -107,7 +107,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, true } else { val segmentProperties = PartitionUtils.getSegmentProperties(identifier, - segmentId, partitionIds.toList, oldPartitionIds, partitionInfo) + segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable) val processor = new RowResultProcessor( carbonTable, carbonLoadModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 2f190b5..26fa037 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -64,6 +64,9 @@ class CarbonIUDMergerRDD[K, V]( CarbonTableInputFormat.setSegmentsToAccess( job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava) + CarbonTableInputFormat.setTableInfo( + job.getConfiguration, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) // get splits val splits = format.getSplits(job) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 4439db5..1a8943b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -45,6 +45,7 @@ import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResult import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonCompactionUtil import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor @@ -95,6 +96,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, val job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier, partitionIds.toList.asJava, job) + CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) job.getConfiguration.set("query.id", queryId) val splits = format.getSplitsOfOneSegment(job, segmentId, @@ -152,7 +154,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, } } val segmentProperties = PartitionUtils.getSegmentProperties(absoluteTableIdentifier, - segmentId, partitionIds.toList, oldPartitionIdList, partitionInfo) + segmentId, partitionIds.toList, oldPartitionIdList, partitionInfo, carbonTable) val partColIdx = getPartitionColumnIndex(partitionColumnName, segmentProperties) indexInitialise() for (iterator <- result.asScala) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 37eea60..9a0098e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -275,6 +275,15 @@ class AlterTableColumnSchemaGenerator( sys.error(s"Duplicate column found with name: $name") }) + if (newCols.exists(_.getDataType.isComplexType)) { + LOGGER.error(s"Complex column cannot be added") + LOGGER.audit( + s"Validation failed for Create/Alter Table Operation " + + s"for ${ dbName }.${ alterTableModel.tableName }. " + + s"Complex column cannot be added") + sys.error(s"Complex column cannot be added") + } + val columnValidator = CarbonSparkFactory.getCarbonColumnValidator columnValidator.validateColumns(allColumns) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 82052aa..0498b25 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -32,9 +32,11 @@ import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlock import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.util.CommonUtil @@ -128,9 +130,16 @@ object PartitionUtils { */ def getSegmentProperties(identifier: AbsoluteTableIdentifier, segmentId: String, partitionIds: List[String], oldPartitionIdList: List[Int], - partitionInfo: PartitionInfo): SegmentProperties = { + partitionInfo: PartitionInfo, + carbonTable: CarbonTable): SegmentProperties = { val tableBlockInfoList = - getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList, partitionInfo) + getPartitionBlockList( + identifier, + segmentId, + partitionIds, + oldPartitionIdList, + partitionInfo, + carbonTable) val footer = CarbonUtil.readMetadatFile(tableBlockInfoList.get(0)) val segmentProperties = new SegmentProperties(footer.getColumnInTable, footer.getSegmentInfo.getColumnCardinality) @@ -139,11 +148,13 @@ object PartitionUtils { def getPartitionBlockList(identifier: AbsoluteTableIdentifier, segmentId: String, partitionIds: List[String], oldPartitionIdList: List[Int], - partitionInfo: PartitionInfo): java.util.List[TableBlockInfo] = { + partitionInfo: PartitionInfo, + carbonTable: CarbonTable): java.util.List[TableBlockInfo] = { val jobConf = new JobConf(new Configuration) val job = new Job(jobConf) val format = CarbonInputFormatUtil .createCarbonTableInputFormat(identifier, partitionIds.asJava, job) + CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) val splits = format.getSplitsOfOneSegment(job, segmentId, oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) @@ -163,7 +174,7 @@ object PartitionUtils { val tablePath = carbonLoadModel.getTablePath val tableBlockInfoList = getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds, - partitionInfo).asScala + partitionInfo, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable).asScala val pathList: util.List[String] = new util.ArrayList[String]() val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val carbonTablePath = new CarbonTablePath(carbonTableIdentifier, tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 472435e..60cb61f 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -117,6 +117,7 @@ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> <systemProperties> <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> <failIfNoTests>false</failIfNoTests> </configuration> @@ -137,6 +138,7 @@ </environmentVariables> <systemProperties> <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index d85e064..114c25d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -111,9 +111,12 @@ case class CarbonAlterTableDropPartitionCommand( schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) - carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, tablePath) - CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) + carbonMetaStore.updateTableSchemaForAlter( + table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + thriftTable, + null, + table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() // sparkSession.catalog.refreshTable(tableName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 65a0af3..bafc96a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -101,12 +101,14 @@ case class CarbonAlterTableSplitPartitionCommand( wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, tablePath) - CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) + carbonMetaStore + .updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + thriftTable, + null, + table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() - sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName))) Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index ea5cfed..17e2f2b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ @@ -57,7 +58,7 @@ object LoadPostAggregateListener extends OperationEventListener { val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) { - dataMapSchema.getProperties.get("CHILD_SELECT QUERY") + PreAggregateUtil.getChildQuery(dataMapSchema) } else { // for timeseries rollup policy val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 153c1a4..7f1f46f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -38,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -872,4 +873,16 @@ object PreAggregateUtil { } }.canonicalized.asInstanceOf[T] } + + /** + * Gives child query from schema + * @param aggDataMapSchema + * @return + */ + def getChildQuery(aggDataMapSchema: AggregationDataMapSchema): String = { + new String( + CarbonUtil.decodeStringToBytes( + aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), + CarbonCommonConstants.DEFAULT_CHARSET) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 1c9ca5d..744fbd8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -376,12 +376,12 @@ class CarbonFileMetastore extends CarbonMetaStore { carbonTableToBeRemoved match { case Some(carbonTable) => metadata.carbonTables -= carbonTable - CarbonMetadata.getInstance.removeTable(dbName, tableName) case None => if (LOGGER.isDebugEnabled) { LOGGER.debug(s"No entry for table $tableName in database $dbName") } } + CarbonMetadata.getInstance.removeTable(dbName, tableName) } private def updateMetadataByWrapperTable( http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index a2d1064..54f58fc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -167,8 +167,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, newTablePath) - val dbName = oldTableIdentifier.getDatabaseName - val tableName = oldTableIdentifier.getTableName + val dbName = newTableIdentifier.getDatabaseName + val tableName = newTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .getClient() @@ -177,7 +177,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath + newTablePath } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 7dc3d3f..eb59184 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.StructType +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema @@ -176,11 +177,15 @@ trait CarbonMetaStore { */ object CarbonMetaStoreFactory { + val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.hive.CarbonMetaStoreFactory") + def createCarbonMetaStore(conf: RuntimeConfig): CarbonMetaStore = { val readSchemaFromHiveMetaStore = readSchemaFromHive(conf) if (readSchemaFromHiveMetaStore) { + LOGGER.info("Hive based carbon metastore is enabled") new CarbonHiveMetaStore() } else { + LOGGER.info("File based carbon metastore is enabled") new CarbonFileMetastore() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 299ed4d..5db4895 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -562,7 +562,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule if(null == aggDataMapSchema.getAggExpToColumnMapping) { // add preAGG UDF to avoid all the PreAggregate rule val childDataMapQueryString = parser.addPreAggFunction( - aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY")) + PreAggregateUtil.getChildQuery(aggDataMapSchema)) // get the logical plan val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan // getting all aggregate expression from query http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala index e48aaba..1b36e17 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala @@ -20,11 +20,10 @@ import java.io.{File, IOException} import org.apache.commons.io.FileUtils import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} /** * @@ -67,10 +66,12 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'a',1,'aa','aaa'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - checkAnswer(sql("select count(*) from carbontable"), Row(1)) - checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(1)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + } } test("register table test") { @@ -81,10 +82,12 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'a',1,'aa','aaa'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - checkAnswer(sql("select count(*) from carbontable"), Row(1)) - checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(1)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + } } test("register pre aggregate tables test") { @@ -99,13 +102,15 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { backUpData(dblocation, "carbontable") backUpData(dblocation, "carbontable_preagg1") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - restoreData(dblocation, "carbontable_preagg1") - sql("refresh table carbontable") - checkAnswer(sql("select count(*) from carbontable"), Row(3)) - checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) - checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) - checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + restoreData(dblocation, "carbontable_preagg1") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(3)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) + checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) + checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + } } test("register pre aggregate table test") { @@ -120,13 +125,15 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { backUpData(dblocation, "carbontable") backUpData(dblocation, "carbontable_preagg1") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - restoreData(dblocation, "carbontable_preagg1") - sql("refresh table carbontable") - checkAnswer(sql("select count(*) from carbontable"), Row(3)) - checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) - checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) - checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + restoreData(dblocation, "carbontable_preagg1") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(3)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) + checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) + checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + } } test("register pre aggregate table should fail if the aggregate table not copied") { @@ -141,15 +148,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { backUpData(dblocation, "carbontable") backUpData(dblocation, "carbontable_preagg1") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - try { - sql("refresh table carbontable") - assert(false) - } catch { - case e : AnalysisException => - assert(true) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + try { + sql("refresh table carbontable") + assert(false) + } catch { + case e: AnalysisException => + assert(true) + } + restoreData(dblocation, "carbontable_preagg1") } - restoreData(dblocation, "carbontable_preagg1") } test("Update operation on carbon table should pass after registration or refresh") { @@ -161,15 +170,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'bb','bbb'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - // update operation - sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() - sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show() - checkAnswer( - sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb")) - ) + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + // update operation + sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() + sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show() + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("a", 2, "aa", "aaa"), Row("b", 2, "bb", "bbb")) + ) + } } test("Update operation on carbon table") { @@ -185,15 +196,16 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge") backUpData(dblocation, "automerge") sql("drop table automerge") - restoreData(dblocation, "automerge") - sql("refresh table automerge") - // update operation - sql("""update carbon.automerge d set (d.id) = (d.id + 1) where d.id > 2""").show() - checkAnswer( - sql("select count(*) from automerge"), - Seq(Row(6)) - ) - // sql("drop table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "automerge") + sql("refresh table automerge") + // update operation + sql("""update carbon.automerge d set (d.id) = (d.id + 1) where d.id > 2""").show() + checkAnswer( + sql("select count(*) from automerge"), + Seq(Row(6)) + ) + } } test("Delete operation on carbon table") { @@ -205,15 +217,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'bb','bbb'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - // delete operation - sql("""delete from carbontable where c3 = 'aa'""").show - checkAnswer( - sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("b",1,"bb","bbb")) - ) - sql("drop table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + // delete operation + sql("""delete from carbontable where c3 = 'aa'""").show + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("b", 1, "bb", "bbb")) + ) + sql("drop table carbontable") + } } test("Alter table add column test") { @@ -225,15 +239,17 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'bb','bbb'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - sql("Alter table carbontable add columns(c4 string) " + - "TBLPROPERTIES('DICTIONARY_EXCLUDE'='c4', 'DEFAULT.VALUE.c4'='def')") - checkAnswer( - sql("""select c1,c2,c3,c5,c4 from carbon.carbontable"""), - Seq(Row("a",1,"aa","aaa","def"), Row("b",1,"bb","bbb","def")) - ) - sql("drop table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable add columns(c4 string) " + + "TBLPROPERTIES('DICTIONARY_EXCLUDE'='c4', 'DEFAULT.VALUE.c4'='def')") + checkAnswer( + sql("""select c1,c2,c3,c5,c4 from carbon.carbontable"""), + Seq(Row("a", 1, "aa", "aaa", "def"), Row("b", 1, "bb", "bbb", "def")) + ) + sql("drop table carbontable") + } } test("Alter table change column datatype test") { @@ -245,14 +261,16 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'bb','bbb'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - sql("Alter table carbontable change c2 c2 long") - checkAnswer( - sql("""select c1,c2,c3,c5 from carbon.carbontable"""), - Seq(Row("a",1,"aa","aaa"), Row("b",1,"bb","bbb")) - ) - sql("drop table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable change c2 c2 long") + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("a", 1, "aa", "aaa"), Row("b", 1, "bb", "bbb")) + ) + sql("drop table carbontable") + } } test("Alter table drop column test") { @@ -264,14 +282,16 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("insert into carbontable select 'b',1,'bb','bbb'") backUpData(dblocation, "carbontable") sql("drop table carbontable") - restoreData(dblocation, "carbontable") - sql("refresh table carbontable") - sql("Alter table carbontable drop columns(c2)") - checkAnswer( - sql("""select * from carbon.carbontable"""), - Seq(Row("a","aa","aaa"), Row("b","bb","bbb")) - ) - sql("drop table carbontable") + if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable drop columns(c2)") + checkAnswer( + sql("""select * from carbon.carbontable"""), + Seq(Row("a", "aa", "aaa"), Row("b", "bb", "bbb")) + ) + sql("drop table carbontable") + } } override def afterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala index 597982a..e89efdb 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala @@ -227,7 +227,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl test("test adding complex datatype column") { try { sql("alter table restructure add columns(arr array<string>)") - sys.error("Exception should be thrown for complex column add") + assert(false, "Exception should be thrown for complex column add") } catch { case e: Exception => println(e.getMessage) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 07fd946..cdfcd75 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,7 @@ <hdfs.url>local</hdfs.url> <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name> <script.exetension>.sh</script.exetension> + <carbon.hive.based.metastore>false</carbon.hive.based.metastore> </properties> <repositories> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 4862604..b91be0c 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -145,6 +145,7 @@ class StreamHandoffRDD[K, V]( projection.addColumn(dataFields.get(index).getColName) } CarbonTableInputFormat.setColumnProjection(hadoopConf, projection) + CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) val format = new CarbonTableInputFormat[Array[Object]]() val model = format.getQueryModel(inputSplit, attemptContext) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7504a5c5/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index b720c1a..cecc18c 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -103,11 +103,16 @@ object StreamSinkFactory { * @return */ private def getStreamSegmentId(carbonTable: CarbonTable): String = { - val segmentId = StreamSegment.open(carbonTable) val carbonTablePath = CarbonStorePath .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath) + if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) { + // Create table directory path, in case of enabling hive metastore first load may not have + // table folder created. + FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType) + } + val segmentId = StreamSegment.open(carbonTable) val segmentDir = carbonTablePath.getSegmentDir("0", segmentId) - val fileType = FileFactory.getFileType(segmentDir) if (FileFactory.isFileExist(segmentDir, fileType)) { // recover fault StreamSegment.recoverSegmentIfRequired(segmentDir)
