Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157339822
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.{CarbonMetadata,
PartitionFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter,
CarbonTableOutputFormat}
+import
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import
org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
+
+class CarbonFileFormat
+ extends FileFormat
+ with DataSourceRegister
+ with Logging
+with Serializable {
+
+ override def shortName(): String = "carbondata"
+
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ None
+ }
+
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ conf.setClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[CarbonOutputCommitter],
+ classOf[CarbonOutputCommitter])
+ conf.set("carbon.commit.protocol", "carbon.commit.protocol")
+ sparkSession.sessionState.conf.setConfString(
+ "spark.sql.sources.commitProtocolClass",
+
"org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+ job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
+
+ var table = CarbonMetadata.getInstance().getCarbonTable(
+ options.getOrElse("dbName", "default"), options("tableName"))
+// table = CarbonTable.buildFromTableInfo(table.getTableInfo, true)
--- End diff --
remove it
---