Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212329045
--- Diff:
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
---
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql._
+import
org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience,
InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.{Expression =>
CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.statusmanager.{FileFormat =>
CarbonFileFormatVersion}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection,
CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat,
CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import
org.apache.carbondata.processing.loading.complexobjects.{ArrayObject,
StructObject}
+import
org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
+
+/**
+ * Used to read and write data stored in carbondata files to/from the
spark execution engine.
+ */
[email protected]
[email protected]
+class SparkCarbonFileFormat extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+
+ @transient val LOGGER =
LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * If user does not provide schema while reading the data then spark
calls this method to infer
+ * schema from the carbodata files. It reads the schema present in
carbondata files and return it.
+ */
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val tablePath = options.get("path") match {
+ case Some(path) => path
+ case _ =>
FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
+ }
+
+ val tableInfo =
SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val table = CarbonTable.buildFromTableInfo(tableInfo)
+ var schema = new StructType
+ tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+ // TODO find better way to know its a child
+ if (!col.getColumnName.contains(".")) {
+ schema = schema.add(
+ col.getColumnName,
+ SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+ }
+ }
+ Some(schema)
+ }
+
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client
side job preparation is
+ * done here.
+ */
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ val conf = job.getConfiguration
+
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options,
dataSchema)
+ model.setLoadWithoutConverterStep(true)
+ CarbonTableOutputFormat.setLoadModel(conf, model)
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ val updatedPath = if
(path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+ new Path(path).getParent.toString
+ } else {
+ path
+ }
+ context.getConfiguration.set("carbon.outputformat.writepath",
updatedPath)
+ context.getConfiguration.set("carbon.outputformat.taskno",
System.nanoTime() + "")
+ new CarbonOutputWriter(path, context, dataSchema.fields)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String =
{
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+ }
+ }
+
+ /**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+ private trait AbstractCarbonOutputWriter {
+ def write(row: Row): Unit = throw new
UnsupportedOperationException("call writeInternal")
+ def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def write(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def writeCarbon(row: InternalRow): Unit
+ }
+
+
+ /**
+ * Writer class for carbondata files
+ */
+ private class CarbonOutputWriter(path: String,
+ context: TaskAttemptContext,
+ fieldTypes: Array[StructField]) extends OutputWriter with
AbstractCarbonOutputWriter {
+
+ private val writable = new ObjectArrayWritable
+
+ private val cutOffDate = Integer.MAX_VALUE >> 1
+
+ private val recordWriter: RecordWriter[NullWritable,
ObjectArrayWritable] =
+ new CarbonTableOutputFormat().getRecordWriter(context)
+
+ /**
+ * Write sparks internal row to carbondata record writer
+ */
+ def writeCarbon(row: InternalRow): Unit = {
+ val data: Array[AnyRef] = extractData(row, fieldTypes)
+ writable.set(data)
+ recordWriter.write(NullWritable.get(), writable)
+ }
+
+ override def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: InternalRow, fieldTypes:
Array[StructField]): Array[AnyRef] = {
+ val data = new Array[AnyRef](fieldTypes.length)
+ var i = 0
+ while (i < fieldTypes.length) {
+ if (!row.isNullAt(i)) {
+ fieldTypes(i).dataType match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision,
d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i,
s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i),
s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case d: TimestampType =>
+ data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
+ case other =>
+ data(i) = row.get(i, other)
+ }
+ } else {
+ setNull(fieldTypes(i).dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) =
{
+ dataType match {
+ case d: DateType =>
+ // 1 as treated as null in carbon
+ data(i) = 1.asInstanceOf[AnyRef]
+ case _ =>
+ }
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: ArrayData, dataType: DataType):
Array[AnyRef] = {
+ val data = new Array[AnyRef](row.numElements())
+ var i = 0
+ while (i < data.length) {
+ if (!row.isNullAt(i)) {
+ dataType match {
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision,
d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i,
s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i),
s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case other => data(i) = row.get(i, dataType)
+ }
+ } else {
+ setNull(dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
+ }
+
+ override def shortName(): String = "carbon"
+
+ override def toString: String = "carbon"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[SparkCarbonFileFormat]
+
+ /**
+ * Whether to support vector reader while reading data.
+ * In case of complex types it is not required to support it
+ */
+ private def supportVector(sparkSession: SparkSession, schema:
StructType): Boolean = {
+ val vectorizedReader = {
+ if (sparkSession.sqlContext.sparkSession.conf
+ .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+
sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if
(System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ vectorizedReader.toBoolean &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ */
+ override def supportBatch(sparkSession: SparkSession, schema:
StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+ /**
+ * Returns a function that can be used to read a single carbondata file
in as an
+ * Iterator of InternalRow.
+ */
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow]
= {
+ val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+ CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = requiredSchema.map(_.name).toArray
+ val carbonProjection = new CarbonProjection
+ projection.foreach(carbonProjection.addColumn)
+
+ var supportBatchValue: Boolean = false
+
+ val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
+ val readVector = supportVector(sparkSession, resultSchema)
+ if (readVector) {
+ supportBatchValue = supportBatch(sparkSession, resultSchema)
+ }
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options,
dataSchema)
+ CarbonInputFormat
+ .setTableInfo(hadoopConf,
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+ CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+ CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
+ filter match {
+ case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case None => None
+ }
+ val format: CarbonFileInputFormat[Object] = new
CarbonFileInputFormat[Object]
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ file: PartitionedFile => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
--- End diff --
Can we use just check with carbondata extn?
---