Repository: carbondata Updated Branches: refs/heads/master f1c6dddec -> a89587e70
[CARBONDATA-1908][PARTITION] Support UPDATE/DELETE on partition tables. This closes #1681 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a89587e7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a89587e7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a89587e7 Branch: refs/heads/master Commit: a89587e70dddd55b44050423810776edaad26f0b Parents: f1c6ddd Author: ravipesala <[email protected]> Authored: Tue Dec 19 17:43:00 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Dec 21 16:39:09 2017 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonOutputCommitter.java | 30 +++- .../hadoop/api/CarbonTableInputFormat.java | 3 +- .../hadoop/api/CarbonTableOutputFormat.java | 13 +- .../iud/DeleteCarbonTableTestCase.scala | 21 +++ .../iud/UpdateCarbonTableTestCase.scala | 68 +++++++++ .../carbondata/spark/util/CommonUtil.scala | 2 +- .../management/CarbonLoadDataCommand.scala | 23 ++- ...rbonAlterTableDropHivePartitionCommand.scala | 150 +++++++++++++++++++ .../CarbonStandardAlterTableDropPartition.scala | 150 ------------------- .../datasources/CarbonFileFormat.scala | 9 +- .../sql/execution/strategy/DDLStrategy.scala | 4 +- .../loading/CarbonDataLoadConfiguration.java | 11 ++ .../loading/DataLoadProcessBuilder.java | 6 +- .../iterator/CarbonOutputIteratorWrapper.java | 61 +++++--- .../store/CarbonFactDataHandlerColumnar.java | 4 + .../store/CarbonFactDataHandlerModel.java | 8 + .../processing/util/CarbonLoaderUtil.java | 14 +- 17 files changed, 384 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 08fd1ac..6f5d0e4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -18,12 +18,18 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.metadata.PartitionMapFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -76,18 +82,32 @@ public class CarbonOutputCommitter extends FileOutputCommitter { super.commitJob(context); boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail(); String segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()); // Merge all partition files into a single file. new PartitionMapFileStore().mergePartitionMapFiles(segmentPath, loadModel.getFactTimeStamp() + ""); - LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail(); CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(), true); - CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), - loadModel.getCarbonDataLoadSchema().getCarbonTable()); - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); + CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + long segmentSize = CarbonLoaderUtil + .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); + if (segmentSize > 0) { + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); + String updateTime = + context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null); + if (updateTime != null) { + Set<String> segmentSet = new HashSet<>( + new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) + .getValidAndInvalidSegments().getValidSegments()); + CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true, + new ArrayList<String>()); + } + } else { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel); + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 6a2349a..22eca9f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -128,6 +128,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; public static final String PARTITIONS_TO_PRUNE = "mapreduce.input.carboninputformat.partitions.to.prune"; + public static final String UPADTE_T = + "mapreduce.input.carboninputformat.partitions.to.prune"; // a cache for carbon table, it will be used in task side private CarbonTable carbonTable; @@ -915,7 +917,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { * @param identifier * @return * @throws IOException - * @throws KeyGenException */ public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier, List<String> partitions) throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index f11cb35..bd70e41 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -40,6 +40,8 @@ import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWra import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -87,6 +89,13 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri "mapreduce.carbontable.dict.server.host"; public static final String DICTIONARY_SERVER_PORT = "mapreduce.carbontable.dict.server.port"; + /** + * Set the update timestamp if user sets in case of update query. It needs to be updated + * in load status update time + */ + public static final String UPADTE_TIMESTAMP = "mapreduce.carbontable.update.timestamp"; + + private static final Log LOG = LogFactory.getLog(CarbonTableOutputFormat.class); private CarbonOutputCommitter committer; @@ -373,15 +382,17 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - iteratorWrapper.close(); + iteratorWrapper.closeWriter(); try { future.get(); } catch (ExecutionException e) { + LOG.error("Error while loading data", e); throw new InterruptedException(e.getMessage()); } finally { executorService.shutdownNow(); dataLoadExecutor.close(); } + LOG.info("Closed partition writer task " + taskAttemptContext.getTaskAttemptID()); } public CarbonLoadModel getLoadModel() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index efbe807..deaad20 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -104,6 +104,27 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { ) } + test("partition delete data from carbon table with alias [where clause ]") { + sql("drop table if exists iud_db.dest") + sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest""") + sql("""delete from iud_db.dest d where d.c1 = 'a'""").show + checkAnswer( + sql("""select c2 from iud_db.dest"""), + Seq(Row(2), Row(3),Row(4), Row(5)) + ) + } + test("partition delete data from carbon table[where clause ]") { + sql("""drop table if exists iud_db.dest""") + sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest""") + sql("""delete from iud_db.dest where c2 = 2""").show + checkAnswer( + sql("""select c1 from iud_db.dest"""), + Seq(Row("a"), Row("c"), Row("d"), Row("e")) + ) + } + test("Records more than one pagesize after delete operation ") { sql("DROP TABLE IF EXISTS carbon2") import sqlContext.implicits._ http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index f265e75..25a4999 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -569,6 +569,74 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.defaultValueIsPersistEnabled) } + test("partition test update operation with 0 rows updation.") { + sql("""drop table if exists iud.zerorows_part""").show + sql("""create table iud.zerorows_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows_part""") + sql("""update iud.zerorows_part d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() + sql("""update iud.zerorows_part d set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show() + checkAnswer( + sql("""select c1,c2,c3,c5 from iud.zerorows_part"""), + Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee")) + ) + sql("""drop table iud.zerorows_part""").show + + } + + + test("partition update carbon table[select from source table with where and exist]") { + sql("""drop table if exists iud.dest11_part""").show + sql("""create table iud.dest11_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11_part""") + sql("""update iud.dest11_part d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() + checkAnswer( + sql("""select c3,c5 from iud.dest11_part"""), + Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music")) + ) + sql("""drop table iud.dest11_part""").show + } + + test("partition update carbon table[using destination table columns with where and exist]") { + sql("""drop table if exists iud.dest22_part""") + sql("""create table iud.dest22_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22_part""") + checkAnswer( + sql("""select c2 from iud.dest22_part where c1='a'"""), + Seq(Row(1)) + ) + sql("""update iud.dest22_part d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() + checkAnswer( + sql("""select c2 from iud.dest22_part where c1='a'"""), + Seq(Row(2)) + ) + sql("""drop table if exists iud.dest22_part""") + } + + test("partition update carbon table without alias in set columns") { + sql("""drop table if exists iud.dest33_part""") + sql("""create table iud.dest33_part (c2 int,c3 string,c5 string) PARTITIONED BY(c1 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33_part""") + sql("""update iud.dest33_part d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() + checkAnswer( + sql("""select c3,c5 from iud.dest33_part where c1='a'"""), + Seq(Row("MGM","Disco")) + ) + sql("""drop table if exists iud.dest33_part""") + } + + test("partition update carbon table without alias in set columns with mulitple loads") { + sql("""drop table if exists iud.dest33_part""") + sql("""create table iud.dest33_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33_part""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33_part""") + sql("""update iud.dest33_part d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() + checkAnswer( + sql("""select c3,c5 from iud.dest33_part where c1='a'"""), + Seq(Row("MGM","Disco"),Row("MGM","Disco")) + ) + sql("""drop table if exists iud.dest33_part""") + } + override def afterAll { sql("use default") sql("drop database if exists iud cascade") http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index ae30300..dba8f0e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -639,7 +639,7 @@ object CommonUtil { def readLoadMetadataDetails(model: CarbonLoadModel): Unit = { val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath val details = SegmentStatusManager.readLoadMetadata(metadataPath) - model.setLoadMetadataDetails(details.toList.asJava) + model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava)) } def configureCSVInputFormat(configuration: Configuration, http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 69be362..7492951 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.CarbonRelation @@ -490,7 +491,24 @@ case class CarbonLoadDataCommand( } InternalRow.fromSeq(data) } - LogicalRDD(attributes, rdd)(sparkSession) + if (updateModel.isDefined) { + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + // In case of update, we don't need the segmrntid column in case of partitioning + val dropAttributes = attributes.dropRight(1) + val finalOutput = relation.output.map { attr => + dropAttributes.find { d => + val index = d.name.lastIndexOf("-updatedColumn") + if (index > 0) { + d.name.substring(0, index).equalsIgnoreCase(attr.name) + } else { + d.name.equalsIgnoreCase(attr.name) + } + }.get + } + Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) + } else { + LogicalRDD(attributes, rdd)(sparkSession) + } } else { var timeStampformatString = carbonLoadModel.getTimestampformat @@ -618,6 +636,9 @@ case class CarbonLoadDataCommand( options += (("onepass", loadModel.getUseOnePass.toString)) options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) + if (updateModel.isDefined) { + options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString)) + } val hdfsRelation = HadoopFsRelation( location = catalog, partitionSchema = partitionSchema, http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala new file mode 100644 index 0000000..c68e43c --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.partition + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} + +/** + * Drop the partitions from hive and carbon store. It drops the partitions in following steps + * 1. Drop the partitions from carbon store, it just create one new mapper file in each segment + * with uniqueid. + * 2. Drop partitions from hive. + * 3. In any above step fails then roll back the newly created files + * 4. After success of steps 1 and 2 , it commits the files by removing the old fails. + * Here it does not remove any data from store. During compaction the old data won't be considered. + * @param tableName + * @param specs + * @param ifExists + * @param purge + * @param retainData + */ +case class CarbonAlterTableDropHivePartitionCommand( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean) + extends AtomicRunnableCommand { + + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + if (table.isHivePartitionTable) { + try { + specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) + } catch { + case e: Exception => + if (!ifExists) { + throw e + } else { + log.warn(e.getMessage) + return Seq.empty[Row] + } + } + + // Drop the partitions from hive. + AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) + .run(sparkSession) + } + Seq.empty[Row] + } + + + override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { + AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists) + val msg = s"Got exception $exception when processing data of drop partition." + + "Adding back partitions to the metadata" + LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) + Seq.empty[Row] + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + var locks = List.empty[ICarbonLock] + val uniqueId = System.currentTimeMillis().toString + try { + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + locks = AlterTableUtil.validateTableAndAcquireLock( + table.getDatabaseName, + table.getTableName, + locksToBeAcquired)(sparkSession) + val partitionNames = specs.flatMap { f => + f.map(k => k._1 + "=" + k._2) + }.toSet + val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) + .getValidAndInvalidSegments.getValidSegments + try { + // First drop the partitions from partition mapper files of each segment + new CarbonDropPartitionRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + partitionNames.toSeq, + uniqueId).collect() + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + false, + uniqueId).collect() + throw e + } + // commit the drop partitions from carbon store + new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + true, + uniqueId).collect() + // Update the loadstatus with update time to clear cache from driver. + val segmentSet = new util.HashSet[String](new SegmentStatusManager(table + .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) + CarbonUpdateUtil.updateTableMetadataStatus( + segmentSet, + table, + uniqueId, + true, + new util.ArrayList[String]) + DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) + } finally { + AlterTableUtil.releaseLocks(locks) + } + Seq.empty[Row] + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala deleted file mode 100644 index fd59587..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.partition - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} -import org.apache.spark.util.AlterTableUtil - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} - -/** - * Drop the partitions from hive and carbon store. It drops the partitions in following steps - * 1. Drop the partitions from carbon store, it just create one new mapper file in each segment - * with uniqueid. - * 2. Drop partitions from hive. - * 3. In any above step fails then roll back the newly created files - * 4. After success of steps 1 and 2 , it commits the files by removing the old fails. - * Here it does not remove any data from store. During compaction the old data won't be considered. - * @param tableName - * @param specs - * @param ifExists - * @param purge - * @param retainData - */ -case class CarbonStandardAlterTableDropPartition( - tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], - ifExists: Boolean, - purge: Boolean, - retainData: Boolean) - extends AtomicRunnableCommand { - - - override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - if (table.isHivePartitionTable) { - try { - specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) - } catch { - case e: Exception => - if (!ifExists) { - throw e - } else { - log.warn(e.getMessage) - return Seq.empty[Row] - } - } - - // Drop the partitions from hive. - AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) - .run(sparkSession) - } - Seq.empty[Row] - } - - - override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists) - val msg = s"Got exception $exception when processing data of drop partition." + - "Adding back partitions to the metadata" - LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) - Seq.empty[Row] - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - var locks = List.empty[ICarbonLock] - val uniqueId = System.currentTimeMillis().toString - try { - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, - LockUsage.COMPACTION_LOCK, - LockUsage.DELETE_SEGMENT_LOCK, - LockUsage.DROP_TABLE_LOCK, - LockUsage.CLEAN_FILES_LOCK, - LockUsage.ALTER_PARTITION_LOCK) - locks = AlterTableUtil.validateTableAndAcquireLock( - table.getDatabaseName, - table.getTableName, - locksToBeAcquired)(sparkSession) - val partitionNames = specs.flatMap { f => - f.map(k => k._1 + "=" + k._2) - }.toSet - val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) - .getValidAndInvalidSegments.getValidSegments - try { - // First drop the partitions from partition mapper files of each segment - new CarbonDropPartitionRDD(sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - partitionNames.toSeq, - uniqueId).collect() - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - false, - uniqueId).collect() - throw e - } - // commit the drop partitions from carbon store - new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - true, - uniqueId).collect() - // Update the loadstatus with update time to clear cache from driver. - val segmentSet = new util.HashSet[String](new SegmentStatusManager(table - .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) - CarbonUpdateUtil.updateTableMetadataStatus( - segmentSet, - table, - uniqueId, - true, - new util.ArrayList[String]) - DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) - } finally { - AlterTableUtil.releaseLocks(locks) - } - Seq.empty[Row] - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 3eed726..a95693c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -104,8 +104,15 @@ with Serializable { model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) - CarbonTableOutputFormat.setLoadModel(conf, model) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) + // Set the update timestamp if user sets in case of update query. It needs to be updated + // in load status update time + val updateTimeStamp = options.getOrElse("updatetimestamp", null) + if (updateTimeStamp != null) { + conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp) + model.setFactTimeStamp(updateTimeStamp.toLong) + } + CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index eeadbf6..89fcfd2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand} -import org.apache.spark.sql.execution.command.partition.{CarbonShowCarbonPartitionsCommand, CarbonStandardAlterTableDropPartition} +import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand} import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} @@ -174,7 +174,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { .tableExists(tableName)(sparkSession) if (isCarbonTable) { ExecutedCommandExec( - CarbonStandardAlterTableDropPartition( + CarbonAlterTableDropHivePartitionCommand( tableName, specs, ifExists, http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 19c8f03..7b1ab9d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -104,6 +104,11 @@ public class CarbonDataLoadConfiguration { // contains metadata used in write step of loading process private TableSpec tableSpec; + /** + * Number of thread cores to use while writing data files + */ + private short writingCoresCount; + public CarbonDataLoadConfiguration() { } @@ -351,5 +356,11 @@ public class CarbonDataLoadConfiguration { this.tableSpec = tableSpec; } + public short getWritingCoresCount() { + return writingCoresCount; + } + public void setWritingCoresCount(short writingCoresCount) { + this.writingCoresCount = writingCoresCount; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 787cb7b..87d5b97 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -224,7 +224,11 @@ public final class DataLoadProcessBuilder { configuration.setPreFetch(loadModel.isPreFetch()); configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns()); configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns()); - + // For partition loading always use single core as it already runs in multiple + // threads per partition + if (carbonTable.isHivePartitionTable()) { + configuration.setWritingCoresCount((short) 1); + } TableSpec tableSpec = new TableSpec(dimensions, measures); configuration.setTableSpec(tableSpec); return configuration; http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index 08d1497..66943c8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -18,15 +18,21 @@ package org.apache.carbondata.processing.loading.iterator; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.CarbonIterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * It is wrapper class to hold the rows in batches when record writer writes the data and allows * to iterate on it during data load. It uses blocking queue to coordinate between read and write. */ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { + private static final Log LOG = LogFactory.getLog(CarbonOutputIteratorWrapper.class); + private boolean close = false; /** @@ -50,34 +56,48 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { @Override public boolean hasNext() { - return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext(); - } - - @Override - public String[] next() { - if (readBatch == null || !readBatch.hasNext() && !close) { + if (readBatch == null || !readBatch.hasNext()) { try { - readBatch = queue.take(); + if (!close) { + readBatch = queue.poll(5, TimeUnit.MINUTES); + if (readBatch == null) { + LOG.warn("This scenario should not happen"); + return false; + } + } else { + readBatch = queue.poll(); + if (readBatch == null) { + return false; + } + } } catch (InterruptedException e) { throw new RuntimeException(e); } } - return readBatch.next(); + return readBatch.hasNext(); } @Override - public void close() { - if (loadBatch.isLoading()) { - try { - loadBatch.readyRead(); - if (loadBatch.size > 0) { - queue.put(loadBatch); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); + public String[] next() { + return readBatch.next(); + } + + public void closeWriter() { + try { + loadBatch.readyRead(); + if (loadBatch.size > 0) { + queue.put(loadBatch); } + } catch (InterruptedException e) { + throw new RuntimeException(e); } close = true; + // It is required if the thread waits for take. + if (queue.isEmpty()) { + if (!queue.offer(new RowBatch(0))) { + LOG.warn("The default last element is not added to queue"); + } + } } private static class RowBatch extends CarbonIterator<String[]> { @@ -88,8 +108,6 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { private int size; - private boolean isLoading = true; - private RowBatch(int size) { batch = new String[size][]; this.size = size; @@ -108,11 +126,6 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { public void readyRead() { size = counter; counter = 0; - isLoading = false; - } - - public boolean isLoading() { - return isLoading; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 75fcea3..2e62d28 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -161,6 +161,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { numberOfCores = 1; } + // Overriding it to the task specified cores. + if (model.getWritingCoresCount() > 0) { + numberOfCores = model.getWritingCoresCount(); + } blockletProcessingCount = new AtomicInteger(0); producerExecutorService = Executors.newFixedThreadPool(numberOfCores, http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 96bd2e3..d15152c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -161,6 +161,8 @@ public class CarbonFactDataHandlerModel { private DataMapWriterListener dataMapWriterlistener; + private short writingCoresCount; + /** * Create the model using @{@link CarbonDataLoadConfiguration} */ @@ -260,6 +262,7 @@ public class CarbonFactDataHandlerModel { DataMapWriterListener listener = new DataMapWriterListener(); listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId()); carbonFactDataHandlerModel.dataMapWriterlistener = listener; + carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); return carbonFactDataHandlerModel; } @@ -321,6 +324,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.tableSpec = new TableSpec( segmentProperties.getDimensions(), segmentProperties.getMeasures()); + return carbonFactDataHandlerModel; } @@ -563,6 +567,10 @@ public class CarbonFactDataHandlerModel { return sortScope; } + public short getWritingCoresCount() { + return writingCoresCount; + } + public DataMapWriterListener getDataMapWriterlistener() { return dataMapWriterlistener; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 10fdd31..fef6930 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -858,17 +858,19 @@ public final class CarbonLoaderUtil { } /* - * This method will add data size and index size into tablestatus for each segment + * This method will add data size and index size into tablestatus for each segment. And also + * returns the size of the segment. */ - public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, + public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, String segmentId, CarbonTable carbonTable) throws IOException { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); - loadMetadataDetails - .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString()); - loadMetadataDetails - .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString()); + Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); + loadMetadataDetails.setDataSize(String.valueOf(dataSize)); + Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE); + loadMetadataDetails.setIndexSize(String.valueOf(indexSize)); + return dataSize + indexSize; } }
