[CARBONDATA-2080] [S3-Implementation] Propagated hadoopConf from driver to executor for s3 implementation in cluster mode.
Problem : hadoopconf was not getting propagated from driver to the executor that's why load was failing to the distributed environment. Solution: Setting the Hadoop conf in base class CarbonRDD How to verify this PR : Execute the load in the cluster mode It should be a success using location s3. This closes #1860 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b144bff1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b144bff1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b144bff1 Branch: refs/heads/carbonstore-rebase Commit: b144bff11b83400b6f87ef618c1465ea5c816bed Parents: 6a6924c Author: Jatin <jatin.de...@knoldus.in> Authored: Thu Jan 25 16:53:00 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Feb 9 01:39:55 2018 +0800 ---------------------------------------------------------------------- .../spark/rdd/AlterTableAddColumnRDD.scala | 2 +- .../spark/rdd/AlterTableDropColumnRDD.scala | 2 +- .../spark/rdd/CarbonCleanFilesRDD.scala | 2 +- .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 2 +- .../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +- .../spark/rdd/CarbonDropPartitionRDD.scala | 4 +-- .../spark/rdd/CarbonDropTableRDD.scala | 2 +- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 3 +- .../spark/rdd/CarbonMergeFilesRDD.scala | 0 .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +- .../apache/carbondata/spark/rdd/CarbonRDD.scala | 32 ++++++++++++++++++-- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +- .../carbondata/spark/rdd/SparkDataMapJob.scala | 2 +- .../apache/spark/rdd/DataLoadCoalescedRDD.scala | 3 +- .../apache/spark/rdd/UpdateCoalescedRDD.scala | 2 +- .../carbondata/streaming/StreamHandoffRDD.scala | 2 +- 16 files changed, 46 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index 56a66b9..7c1edea 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -50,7 +50,7 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par class AlterTableAddColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], identifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) { + extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala index 248f351..e14524e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -48,7 +48,7 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa class AlterTableDropColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], carbonTableIdentifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) { + extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala index 32523d8..9936a2a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala @@ -33,7 +33,7 @@ class CarbonCleanFilesRDD[V: ClassTag]( databaseName: String, tableName: String, partitioner: Partitioner) - extends CarbonRDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) { sc.setLocalProperty("spark.scheduler.pool", "DDL") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala index 45271a7..b11dfad 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -39,7 +39,7 @@ class CarbonDeleteLoadByDateRDD[K, V]( dimTableName: String, storePath: String, loadMetadataDetails: List[LoadMetadataDetails]) - extends CarbonRDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { sc.setLocalProperty("spark.scheduler.pool", "DDL") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala index 9a1ef33..759ed42 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -34,7 +34,7 @@ class CarbonDeleteLoadRDD[V: ClassTag]( databaseName: String, tableName: String, partitioner: Partitioner) - extends CarbonRDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) { sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index 4806f9f..800cc36 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -47,7 +47,7 @@ class CarbonDropPartitionRDD( partitions: Seq[String], uniqueId: String, partialMatch: Boolean) - extends CarbonRDD[String](sc, Nil) { + extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) { override def getPartitions: Array[Partition] = { segments.zipWithIndex.map {s => @@ -105,7 +105,7 @@ class CarbonDropPartitionCommitRDD( success: Boolean, uniqueId: String, partitions: Seq[String]) - extends CarbonRDD[String](sc, Nil) { + extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) { override def getPartitions: Array[Partition] = { segments.zipWithIndex.map {s => http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala index 652720c..f327d88 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -30,7 +30,7 @@ class CarbonDropTableRDD[V: ClassTag]( valueClass: Value[V], databaseName: String, tableName: String) - extends CarbonRDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) { sc.setLocalProperty("spark.scheduler.pool", "DDL") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 7acf4e2..cf22b3d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -506,7 +506,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, table: CarbonTableIdentifier, dimensions: Array[CarbonDimension], dictFolderPath: String) - extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) { + extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil, + sparkContext.hadoopConfiguration) { override def getPartitions: Array[Partition] = { val primDimensions = dictionaryLoadModel.primDimensions http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 7eff227..1b9363d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -62,7 +62,7 @@ class CarbonMergerRDD[K, V]( carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends CarbonRDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index bf46f67..6f248d2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -17,20 +17,26 @@ package org.apache.carbondata.spark.rdd +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.hadoop.conf.Configuration import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util._ +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil /** * This RDD maintains session level ThreadLocal */ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, - @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) { + @transient private var deps: Seq[Dependency[_]], + @transient hadoopConf: Configuration) extends RDD[T](sc, deps) { val carbonSessionInfo: CarbonSessionInfo = { var info = ThreadLocalSessionInfo.getCarbonSessionInfo @@ -42,14 +48,24 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, info } + private val confBytes = { + val bao = new ByteArrayOutputStream() + val oos = new ObjectOutputStream(bao) + hadoopConf.write(oos) + oos.close() + CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) + } + /** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) = - this (oneParent.context, List(new OneToOneDependency(oneParent))) + this (oneParent.context, List(new OneToOneDependency(oneParent)), + oneParent.sparkContext.hadoopConfiguration) // RDD compute logic should be here def internalCompute(split: Partition, context: TaskContext): Iterator[T] final def compute(split: Partition, context: TaskContext): Iterator[T] = { + CarbonInputFormatUtil.setS3Configurations(getConf) ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) TaskMetricsMap.threadLocal.set(Thread.currentThread().getId) val carbonTaskInfo = new CarbonTaskInfo @@ -59,6 +75,16 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, map(f => CarbonProperties.getInstance().addProperty(f._1, f._2)) internalCompute(split, context) } + + private def getConf: Configuration = { + val configuration = new Configuration(false) + val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor + .unCompressByte(confBytes)) + val ois = new ObjectInputStream(bai) + configuration.readFields(ois) + ois.close() + configuration + } } /** @@ -67,7 +93,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, abstract class CarbonRDDWithTableInfo[T: ClassTag]( @transient sc: SparkContext, @transient private var deps: Seq[Dependency[_]], - serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) { + serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) { def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) = this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo) http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index e17824f..06acbba 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -182,7 +182,7 @@ class NewCarbonDataLoadRDD[K, V]( carbonLoadModel: CarbonLoadModel, blocksGroupBy: Array[(String, Array[BlockDetails])], @transient hadoopConf: Configuration) - extends CarbonRDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) { sc.setLocalProperty("spark.scheduler.pool", "DDL") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala index 600cd80..60052f0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala @@ -58,7 +58,7 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte class DataMapPruneRDD(sc: SparkContext, dataMapFormat: DistributableDataMapFormat, resolverIntf: FilterResolverIntf) - extends CarbonRDD[(ExtendedBlocklet)](sc, Nil) { + extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) { private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala index 2157799..6a97477 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala @@ -29,7 +29,8 @@ case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition) class DataLoadCoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], nodeList: Array[String]) - extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) { + extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil, + prev.sparkContext.hadoopConfiguration) { override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala index 9befcaa..bcca7ed 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD class UpdateCoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], nodeList: Array[String]) - extends CarbonRDD[T](prev.context, Nil) { + extends CarbonRDD[T](prev.context, Nil, prev.sparkContext.hadoopConfiguration) { override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run http://git-wip-us.apache.org/repos/asf/carbondata/blob/b144bff1/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 5c6165d..eb39422 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -96,7 +96,7 @@ class StreamHandoffRDD[K, V]( result: HandoffResult[K, V], carbonLoadModel: CarbonLoadModel, handOffSegmentId: String -) extends CarbonRDD[(K, V)](sc, Nil) { +) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm")