[
https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15639686#comment-15639686
]
ASF GitHub Bot commented on CARBONDATA-2:
-----------------------------------------
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/263#discussion_r86665514
--- Diff:
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil,
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ var storeLocation: String,
+ hdfsStoreLocation: String,
+ kettleHomePath: String,
+ partitioner: Partitioner,
+ columinar: Boolean,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ blocksGroupBy: Array[(String, Array[BlockDetails])],
+ isTableSplitPartition: Boolean)
+ extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging
{
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ // A Hadoop Configuration can be about 10 KB, which is pretty big, so
broadcast it
+ private val confBroadcast =
+ sc.broadcast(new
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+ override def getPartitions: Array[Partition] = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits = Array[TableSplit]()
+
+ if (carbonLoadModel.isDirectLoad) {
+ splits =
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+ partitioner.nodeList, partitioner.partitionCount)
+ }
+ else {
+ splits =
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null, partitioner)
+ }
+
+ splits.zipWithIndex.map { s =>
+ // filter the same partition unique id, because only one will
match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+ p._1 == s._1.getPartition.getUniqueID)(0)._2
+ new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { b =>
+ new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+ }
+ }
+ }
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD should not be checkpointed.
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext):
Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var model: CarbonLoadModel = _
+ var uniqueLoadStatusId =
+ carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
theSplit.index
+ try {
+ loadMetadataDetails.setPartitionCount(partitionID)
+
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ val recordReaders = getRecordReaders
+ val loader = new SparkPartitionLoader(model,
+ theSplit.index,
+ hdfsStoreLocation,
+ kettleHomePath,
+ loadCount,
+ loadMetadataDetails)
+ // Intialize to set carbon properties
+ loader.initialize()
+
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ CarbonLoaderUtil.executeNewDataLoad(model,
+ loader.storeLocation,
+ hdfsStoreLocation,
+ recordReaders)
+ } catch {
+ case e: Exception =>
+ logInfo("DataLoad failure")
+ LOGGER.error(e)
+ throw e
+ }
+
+ def getRecordReaders: Array[RecordReader[NullWritable,
StringArrayWritable]] = {
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true,
theSplit.index, 0)
+ val configuration: Configuration = confBroadcast.value.value
+ configureCSVInputFormat(configuration)
+ val hadoopAttemptContext = newTaskAttemptContext(configuration,
attemptId)
+ val format = new CSVInputFormat
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+
split.serializableHadoopSplit.value.getPartition.getUniqueID,
+
split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader,
carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+
split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID =
split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+ StandardLogService.setThreadName(partitionID, null)
+
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ val readers =
+ split.partitionBlocksDetail.map(format.createRecordReader(_,
hadoopAttemptContext))
+ readers.zipWithIndex
+ .foreach(f =>
f._1.initialize(split.partitionBlocksDetail(f._2), hadoopAttemptContext))
--- End diff --
change `f` to `case (reader, index)` to make it more readable
move foreach to previous line
> Remove kettle for loading data
> ------------------------------
>
> Key: CARBONDATA-2
> URL: https://issues.apache.org/jira/browse/CARBONDATA-2
> Project: CarbonData
> Issue Type: Improvement
> Components: data-load
> Reporter: Liang Chen
> Priority: Critical
> Labels: features
> Fix For: 0.3.0-incubating
>
> Attachments: CarbonDataLoadingdesign.pdf
>
>
> Remove kettle for loading data module
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)