This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new da7db9a  [SPARK-23749][SQL] Replace built-in Hive API (isSub/toKryo) 
and remove OrcProto.Type usage
da7db9a is described below

commit da7db9abf61b85860ace714d34ab17810c775e25
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Mar 14 11:41:40 2019 -0700

    [SPARK-23749][SQL] Replace built-in Hive API (isSub/toKryo) and remove 
OrcProto.Type usage
    
    ## What changes were proposed in this pull request?
    
    In order to make the upgrade built-in Hive changes smaller.
    This pr workaround the simplest 3 API changes first.
    
    ## How was this patch tested?
    
    manual tests
    
    Closes #24018 from wangyum/SPARK-23749.
    
    Lead-authored-by: Yuming Wang <[email protected]>
    Co-authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java   |  8 +++++---
 .../apache/spark/sql/hive/execution/SaveAsHiveFile.scala |  9 ++++++++-
 .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala    | 16 +++++++++++++++-
 3 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
 
b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
index f093637..8e9362a 100644
--- 
a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
+++ 
b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * This is based on hive-exec-1.2.1
@@ -42,8 +41,11 @@ public class SparkOrcNewRecordReader extends
 
   public SparkOrcNewRecordReader(Reader file, Configuration conf,
       long offset, long length) throws IOException {
-    List<OrcProto.Type> types = file.getTypes();
-    numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+    if (file.getTypes().isEmpty()) {
+      numColumns = 0;
+    } else {
+      numColumns = file.getTypes().get(0).getSubtypesCount();
+    }
     value = new OrcStruct(numColumns);
     this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
         length);
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 4ddba50..73b3f20 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -227,7 +227,7 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
     // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
     // staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
     // under the table directory.
-    if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
+    if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
       
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith("."))
 {
       logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
         "with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
@@ -253,6 +253,13 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
     dir
   }
 
+  // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's 
FileUtils.isSubDir().
+  private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
+    val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
+    val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
+    path1.startsWith(path2)
+  }
+
   private def executionId: String = {
     val rand: Random = new Random
     val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index bfb0a95..9ac3e98 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -24,10 +24,14 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Output
+import org.apache.commons.codec.binary.Base64
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.io.orc._
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
 import 
org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
 import org.apache.hadoop.io.{NullWritable, Writable}
@@ -130,7 +134,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
     if (sparkSession.sessionState.conf.orcFilterPushDown) {
       // Sets pushed predicates
       OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>
-        hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, f.toKryo)
+        hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
         hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
       }
     }
@@ -195,6 +199,16 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 
     case _ => false
   }
+
+  // HIVE-11253 moved `toKryo` from `SearchArgument` to `storage-api` module.
+  // This is copied from Hive 1.2's SearchArgumentImpl.toKryo().
+  private def toKryo(sarg: SearchArgument): String = {
+    val kryo = new Kryo()
+    val out = new Output(4 * 1024, 10 * 1024 * 1024)
+    kryo.writeObject(out, sarg)
+    out.close()
+    Base64.encodeBase64String(out.toBytes)
+  }
 }
 
 private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to