Repository: spark
Updated Branches:
  refs/heads/branch-1.6 355bd72e0 -> 9e8a8f71f


[SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf.

TaskAttemptContext's constructor will clone the configuration instead of 
referencing it. Calling setConf after creating TaskAttemptContext makes any 
changes to the configuration made inside setConf unperceived by RecordReader 
instances.

As an example, Titan's InputFormat will change conf when calling setConf. They 
wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append 
Cassandra's configuration. This change fixes the following error when using 
Titan's CassandraInputFormat with Spark:

*java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: 
Required field 'keyspace' was not present! Struct: set_key 
space_args(keyspace:null)*

There's a discussion of this error here: 
https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE

Author: Anderson de Andrade <adeandr...@verticalscope.com>

Closes #10046 from adeandrade/newhadooprdd-fix.

(cherry picked from commit f434f36d508eb4dcade70871611fc022ae0feb56)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e8a8f71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e8a8f71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e8a8f71

Branch: refs/heads/branch-1.6
Commit: 9e8a8f71fb62f55d8fcd8f319c8c25407a8d0010
Parents: 355bd72
Author: Anderson de Andrade <adeandr...@verticalscope.com>
Authored: Thu Dec 3 16:37:00 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Dec 3 16:37:11 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e8a8f71/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d196099..86f38ae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -138,14 +138,14 @@ class NewHadoopRDD[K, V](
       }
       inputMetrics.setBytesReadCallback(bytesReadCallback)
 
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       val format = inputFormatClass.newInstance
       format match {
         case configurable: Configurable =>
           configurable.setConf(conf)
         case _ =>
       }
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       private var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to