http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index e4d3ba5..8aa18d5 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -17,14 +17,31 @@ package org.apache.carbondata.presto.impl; -import com.facebook.presto.spi.SchemaTableName; -import com.facebook.presto.spi.classloader.ThreadContextClassLoader; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.*; -import org.apache.carbondata.core.datastore.block.*; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.DataRefNodeFinder; +import org.apache.carbondata.core.datastore.IndexKey; +import org.apache.carbondata.core.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.datastore.block.AbstractIndex; +import org.apache.carbondata.core.datastore.block.BlockletInfos; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CacheClient; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; + +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Inject; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.thrift.TBase; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import static java.util.Objects.requireNonNull; import com.facebook.presto.spi.TableNotFoundException; @@ -392,8 +415,9 @@ public class CarbonTableReader { TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); if (IUDTable) { - if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId, - updateStatusManager)) { + if (CarbonUtil + .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(), + invalidBlockVOForSegmentId, updateStatusManager)) { continue; } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 277005b..da0d082 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonDataMergerUtil import org.apache.carbondata.processing.model.CarbonLoadModel @@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V]( var blocksOfLastSegment: List[TableBlockInfo] = null - CarbonInputFormat.setSegmentsToAccess( + CarbonTableInputFormat.setSegmentsToAccess( job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava) // get splits http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index bc5ca06..6bc7564 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.model.CarbonLoadModel @@ -293,7 +293,7 @@ class CarbonMergerRDD[K, V]( for (eachSeg <- carbonMergerMapping.validSegments) { // map for keeping the relation of a task and its blocks. - job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) + job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) if (updateStatusManager.getUpdateStatusDetails.length != 0) { updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg) @@ -315,7 +315,8 @@ class CarbonMergerRDD[K, V]( updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString) ) ((!updated) || ((updated) && (!CarbonUtil - .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)))) + .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, + updateDetails, updateStatusManager)))) }) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 85c4bc4..c383779 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.hadoop._ +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.spark.load.CarbonLoaderUtil @@ -246,22 +247,23 @@ class CarbonScanRDD( iterator.asInstanceOf[Iterator[InternalRow]] } - private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = { - CarbonInputFormat.setTableInfo(conf, tableInfo) + private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = { + CarbonTableInputFormat.setTableInfo(conf, tableInfo) createInputFormat(conf) } - private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = { - CarbonInputFormat.setCarbonReadSupport(conf, readSupport) - CarbonInputFormat.setTableInfo(conf, getTableInfo) + private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = { + CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport) + CarbonTableInputFormat.setTableInfo(conf, tableInfo) createInputFormat(conf) } - private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = { - val format = new CarbonInputFormat[Object] - CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath)) - CarbonInputFormat.setFilterPredicates(conf, filterExpression) - CarbonInputFormat.setColumnProjection(conf, columnProjection) + private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { + val format = new CarbonTableInputFormat[Object] + CarbonTableInputFormat.setTablePath(conf, + identifier.appendWithLocalPrefix(identifier.getTablePath)) + CarbonTableInputFormat.setFilterPredicates(conf, filterExpression) + CarbonTableInputFormat.setColumnProjection(conf, columnProjection) format } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala index 2ca3b8c..4950227 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.hadoop.CarbonInputFormat +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat /** @@ -38,8 +38,8 @@ object QueryPlanUtil { * createCarbonInputFormat from query model */ def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : - (CarbonInputFormat[Array[Object]], Job) = { - val carbonInputFormat = new CarbonInputFormat[Array[Object]]() + (CarbonTableInputFormat[Array[Object]], Job) = { + val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) @@ -47,8 +47,8 @@ object QueryPlanUtil { } def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier, - conf: Configuration) : CarbonInputFormat[V] = { - val carbonInputFormat = new CarbonInputFormat[V]() + conf: Configuration) : CarbonTableInputFormat[V] = { + val carbonInputFormat = new CarbonTableInputFormat[V]() val job: Job = new Job(conf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) carbonInputFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 2fc93e6..ba89d51 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader} import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.{CarbonFilters, CarbonOption} @@ -89,16 +90,17 @@ private[sql] case class CarbonDatasourceHadoopRelation( filters.flatMap { filter => CarbonFilters.createCarbonFilter(dataSchema, filter) }.reduceOption(new AndExpression(_, _)) - .foreach(CarbonInputFormat.setFilterPredicates(conf, _)) + .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _)) val projection = new CarbonProjection requiredColumns.foreach(projection.addColumn) - CarbonInputFormat.setColumnProjection(conf, projection) - CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl]) + CarbonTableInputFormat.setColumnProjection(conf, projection) + CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl]) + new CarbonHadoopFSRDD[Row](sqlContext.sparkContext, new SerializableConfiguration(conf), absIdentifier, - classOf[CarbonInputFormat[Row]], + classOf[CarbonTableInputFormat[Row]], classOf[Row] ) } @@ -118,7 +120,7 @@ class CarbonHadoopFSRDD[V: ClassTag]( @transient sc: SparkContext, conf: SerializableConfiguration, identifier: AbsoluteTableIdentifier, - inputFormatClass: Class[_ <: CarbonInputFormat[V]], + inputFormatClass: Class[_ <: CarbonTableInputFormat[V]], valueClass: Class[V]) extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index a292cde..c38f0e1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand( override def run(sqlContext: SQLContext): Seq[Row] = { - - // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution - // .EXECUTION_ID_KEY, null) - // DataFrame(sqlContext, plan).show(truncate = false) - // return Seq.empty - - val res = plan find { case relation: LogicalRelation if (relation.relation .isInstanceOf[CarbonDatasourceRelation]) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala index 70c7caf..e0a8b58 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.hadoop.CarbonInputFormat +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat /** * All the utility functions for carbon plan creation @@ -37,8 +37,8 @@ object QueryPlanUtil { * createCarbonInputFormat from query model */ def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : - (CarbonInputFormat[Array[Object]], Job) = { - val carbonInputFormat = new CarbonInputFormat[Array[Object]]() + (CarbonTableInputFormat[Array[Object]], Job) = { + val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) @@ -46,8 +46,8 @@ object QueryPlanUtil { } def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier, - conf: Configuration) : CarbonInputFormat[V] = { - val carbonInputFormat = new CarbonInputFormat[V]() + conf: Configuration) : CarbonTableInputFormat[V] = { + val carbonInputFormat = new CarbonTableInputFormat[V]() val job: Job = new Job(conf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) carbonInputFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 1788ccb..08b8600 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -378,4 +379,35 @@ public class CarbonCompactionUtil { } return restructuredBlockExists; } + + /** + * This method will check for any restructured block in the blocks selected for compaction + * + * @param segmentMapping + * @param tableLastUpdatedTime + * @return + */ + public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, + long tableLastUpdatedTime) { + boolean restructuredBlockExists = false; + for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { + String segmentId = taskMap.getKey(); + TaskBlockInfo taskBlockInfo = taskMap.getValue(); + Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList(); + for (List<TableBlockInfo> listMetadata : infoList) { + for (TableBlockInfo blockInfo : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, + // it indicates it is a restructured block + if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) { + restructuredBlockExists = true; + break; + } + } + } + if (restructuredBlockExists) { + break; + } + } + return restructuredBlockExists; + } }
