Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support

Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/c53b9ff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/c53b9ff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/c53b9ff9

Branch: refs/heads/master
Commit: c53b9ff9b23755c7d211390ea97bc911a80674bf
Parents: 8dc3a02
Author: Takeshi Yamamuro <[email protected]>
Authored: Mon Apr 10 06:43:20 2017 +0900
Committer: myui <[email protected]>
Committed: Mon Apr 10 06:43:20 2017 +0900

----------------------------------------------------------------------
 .travis.yml                                     |    3 +-
 NOTICE                                          |    1 -
 bin/format_header.sh                            |    3 -
 pom.xml                                         |   11 -
 spark/spark-1.6/bin/mvn-zinc                    |   99 --
 spark/spark-1.6/extra-src/README                |    1 -
 .../org/apache/spark/sql/hive/HiveShim.scala    |  527 --------
 spark/spark-1.6/pom.xml                         |  261 ----
 .../src/main/resources/log4j.properties         |   12 -
 .../hivemall/tools/RegressionDatagen.scala      |   66 -
 .../apache/spark/sql/hive/GroupedDataEx.scala   |  303 -----
 .../org/apache/spark/sql/hive/HivemallOps.scala | 1125 ------------------
 .../apache/spark/sql/hive/HivemallUtils.scala   |  112 --
 .../src/test/resources/log4j.properties         |    7 -
 .../hivemall/mix/server/MixServerSuite.scala    |  123 --
 .../hivemall/tools/RegressionDatagenSuite.scala |   32 -
 .../scala/org/apache/spark/SparkFunSuite.scala  |   48 -
 .../ml/feature/HivemallLabeledPointSuite.scala  |   35 -
 .../scala/org/apache/spark/sql/QueryTest.scala  |  295 -----
 .../spark/sql/catalyst/plans/PlanTest.scala     |   61 -
 .../apache/spark/sql/hive/HiveUdfSuite.scala    |  113 --
 .../spark/sql/hive/HivemallOpsSuite.scala       |  665 -----------
 .../spark/sql/hive/ModelMixingSuite.scala       |  272 -----
 .../spark/streaming/HivemallOpsSuite.scala      |  123 --
 .../apache/spark/test/HivemallQueryTest.scala   |  197 ---
 .../scala/org/apache/spark/test/TestUtils.scala |   62 -
 26 files changed, 1 insertion(+), 4556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a4cb7ea..323e36a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,9 +35,8 @@ notifications:
 
 script:
   - mvn -q scalastyle:check test -Pspark-2.1
-  # test the spark-1.6/2.0 modules only in the following runs
+  # test the spark-2.0 modules only in the following runs
   - mvn -q scalastyle:check clean -Pspark-2.0 -pl spark/spark-2.0 -am test 
-Dtest=none
-  - mvn -q scalastyle:check clean -Pspark-1.6 -pl spark/spark-1.6 -am test 
-Dtest=none
 
 after_success:
   - mvn clean cobertura:cobertura coveralls:report

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 0911f50..79a944c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -50,7 +50,6 @@ o 
hivemall/core/src/main/java/hivemall/utils/buffer/DynamicByteArray.java
     https://orc.apache.org/
     Licensed under the Apache License, Version 2.0
 
-o 
hivemall/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
   
hivemall/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
   hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/QueryTest.scala
   
hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/bin/format_header.sh
----------------------------------------------------------------------
diff --git a/bin/format_header.sh b/bin/format_header.sh
index da67420..f2c063b 100755
--- a/bin/format_header.sh
+++ b/bin/format_header.sh
@@ -37,8 +37,5 @@ mvn license:format
 cd $HIVEMALL_HOME/spark/spark-common
 mvn license:format -P spark-2.0
 
-cd $HIVEMALL_HOME/spark/spark-1.6
-mvn license:format -P spark-1.6
-
 cd $HIVEMALL_HOME/spark/spark-2.0
 mvn license:format -P spark-2.0

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2276e9..7743d5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -293,17 +293,6 @@
                        </properties>
                </profile>
                <profile>
-                       <id>spark-1.6</id>
-                       <modules>
-                               <module>spark/spark-1.6</module>
-                               <module>spark/spark-common</module>
-                       </modules>
-                       <properties>
-                               <spark.version>1.6.2</spark.version>
-                               <spark.binary.version>1.6</spark.binary.version>
-                       </properties>
-               </profile>
-               <profile>
                        <id>compile-xgboost</id>
                        <build>
                                <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/bin/mvn-zinc
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/bin/mvn-zinc b/spark/spark-1.6/bin/mvn-zinc
deleted file mode 100755
index 759b0a5..0000000
--- a/spark/spark-1.6/bin/mvn-zinc
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Copyed from commit 48682f6bf663e54cb63b7e95a4520d34b6fa890b in Apache Spark
-
-# Determine the current working directory
-_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-# Preserve the calling directory
-_CALLING_DIR="$(pwd)"
-# Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
-
-# Installs any application tarball given a URL, the expected tarball name,
-# and, optionally, a checkable binary path to determine if the binary has
-# already been installed
-## Arg1 - URL
-## Arg2 - Tarball Name
-## Arg3 - Checkable Binary
-install_app() {
-  local remote_tarball="$1/$2"
-  local local_tarball="${_DIR}/$2"
-  local binary="${_DIR}/$3"
-  local curl_opts="--progress-bar -L"
-  local wget_opts="--progress=bar:force ${wget_opts}"
-
-  if [ -z "$3" -o ! -f "$binary" ]; then
-    # check if we already have the tarball
-    # check if we have curl installed
-    # download application
-    [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
-      echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \
-      curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
-    # if the file still doesn't exist, lets try `wget` and cross our fingers
-    [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
-      echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \
-      wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
-    # if both were unsuccessful, exit
-    [ ! -f "${local_tarball}" ] && \
-      echo -n "ERROR: Cannot download $2 with cURL or wget; " && \
-      echo "please install manually and try again." && \
-      exit 2
-    cd "${_DIR}" && tar -xzf "$2"
-    rm -rf "$local_tarball"
-  fi
-}
-
-# Install zinc under the bin/ folder
-install_zinc() {
-  local zinc_path="zinc-0.3.9/bin/zinc"
-  [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
-  install_app \
-    "http://downloads.typesafe.com/zinc/0.3.9"; \
-    "zinc-0.3.9.tgz" \
-    "${zinc_path}"
-  ZINC_BIN="${_DIR}/${zinc_path}"
-}
-
-# Setup healthy defaults for the Zinc port if none were provided from
-# the environment
-ZINC_PORT=${ZINC_PORT:-"3030"}
-
-# Install Zinc for the bin/
-install_zinc
-
-# Reset the current working directory
-cd "${_CALLING_DIR}"
-
-# Now that zinc is ensured to be installed, check its status and, if its
-# not running or just installed, start it
-if [ ! -f "${ZINC_BIN}" ]; then
-  exit -1
-fi
-if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port 
${ZINC_PORT}`" ]; then
-  export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
-  "${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
-  "${ZINC_BIN}" -start -port ${ZINC_PORT} &>/dev/null
-fi
-
-# Set any `mvn` options if not already present
-export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
-
-# Last, call the `mvn` command as usual
-mvn -DzincPort=${ZINC_PORT} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/README
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/extra-src/README b/spark/spark-1.6/extra-src/README
deleted file mode 100644
index 8b5d0cd..0000000
--- a/spark/spark-1.6/extra-src/README
+++ /dev/null
@@ -1 +0,0 @@
-Copyed from spark master [commit 908e37bcc10132bb2aa7f80ae694a9df6e40f31a]

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
 
b/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
deleted file mode 100644
index 6da54f7..0000000
--- 
a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.client
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
-import java.lang.reflect.{Method, Modifier}
-import java.net.URI
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => 
JSet}
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.processors.{CommandProcessor, 
CommandProcessorFactory}
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{IntegralType, StringType}
-
-/**
- * A shim that defines the interface between ClientWrapper and the underlying 
Hive library used to
- * talk to the metastore. Each Hive version has its own implementation of this 
class, defining
- * version-specific version of needed functions.
- *
- * The guideline for writing shims is:
- * - always extend from the previous version unless really not possible
- * - initialize methods in lazy vals, both for quicker access for multiple 
invocations, and to
- *   avoid runtime errors due to the above guideline.
- */
-private[client] sealed abstract class Shim {
-
-  /**
-   * Set the current SessionState to the given SessionState. Also, set the 
context classloader of
-   * the current thread to the one set in the HiveConf of this given `state`.
-   * @param state
-   */
-  def setCurrentSessionState(state: SessionState): Unit
-
-  /**
-   * This shim is necessary because the return type is different on different 
versions of Hive.
-   * All parameters are the same, though.
-   */
-  def getDataLocation(table: Table): Option[String]
-
-  def setDataLocation(table: Table, loc: String): Unit
-
-  def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
-
-  def getPartitionsByFilter(hive: Hive, table: Table, predicates: 
Seq[Expression]): Seq[Partition]
-
-  def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
-
-  def getDriverResults(driver: Driver): Seq[String]
-
-  def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
-
-  def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit
-
-  def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit
-
-  def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit
-
-  def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: 
String): Unit
-
-  protected def findStaticMethod(klass: Class[_], name: String, args: 
Class[_]*): Method = {
-    val method = findMethod(klass, name, args: _*)
-    require(Modifier.isStatic(method.getModifiers()),
-      s"Method $name of class $klass is not static.")
-    method
-  }
-
-  protected def findMethod(klass: Class[_], name: String, args: Class[_]*): 
Method = {
-    klass.getMethod(name, args: _*)
-  }
-
-}
-
-private[client] class Shim_v0_12 extends Shim with Logging {
-
-  private lazy val startMethod =
-    findStaticMethod(
-      classOf[SessionState],
-      "start",
-      classOf[SessionState])
-  private lazy val getDataLocationMethod = findMethod(classOf[Table], 
"getDataLocation")
-  private lazy val setDataLocationMethod =
-    findMethod(
-      classOf[Table],
-      "setDataLocation",
-      classOf[URI])
-  private lazy val getAllPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "getAllPartitionsForPruner",
-      classOf[Table])
-  private lazy val getCommandProcessorMethod =
-    findStaticMethod(
-      classOf[CommandProcessorFactory],
-      "get",
-      classOf[String],
-      classOf[HiveConf])
-  private lazy val getDriverResultsMethod =
-    findMethod(
-      classOf[Driver],
-      "getResults",
-      classOf[JArrayList[String]])
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val dropIndexMethod =
-    findMethod(
-      classOf[Hive],
-      "dropIndex",
-      classOf[String],
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE)
-
-  override def setCurrentSessionState(state: SessionState): Unit = {
-    // Starting from Hive 0.13, setCurrentSessionState will internally override
-    // the context class loader of the current thread by the class loader set 
in
-    // the conf of the SessionState. So, for this Hive 0.12 shim, we add the 
same
-    // behavior and make shim.setCurrentSessionState of all Hive versions have 
the
-    // consistent behavior.
-    Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
-    startMethod.invoke(null, state)
-  }
-
-  override def getDataLocation(table: Table): Option[String] =
-    Option(getDataLocationMethod.invoke(table)).map(_.toString())
-
-  override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new URI(loc))
-
-  override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
-    getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
-  override def getPartitionsByFilter(
-      hive: Hive,
-      table: Table,
-      predicates: Seq[Expression]): Seq[Partition] = {
-    // getPartitionsByFilter() doesn't support binary comparison ops in Hive 
0.12.
-    // See HIVE-4888.
-    logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
-      "Please use Hive 0.13 or higher.")
-    getAllPartitions(hive, table)
-  }
-
-  override def getCommandProcessor(token: String, conf: HiveConf): 
CommandProcessor =
-    getCommandProcessorMethod.invoke(null, token, 
conf).asInstanceOf[CommandProcessor]
-
-  override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[String]()
-    getDriverResultsMethod.invoke(driver, res)
-    res.asScala
-  }
-
-  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
-    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000
-  }
-
-  override def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
-      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, 
isSkewedStoreAsSubdir: JBoolean)
-  }
-
-  override def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime: JBoolean)
-  }
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean)
-  }
-
-  override def dropIndex(hive: Hive, dbName: String, tableName: String, 
indexName: String): Unit = {
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
-  }
-
-}
-
-private[client] class Shim_v0_13 extends Shim_v0_12 {
-
-  private lazy val setCurrentSessionStateMethod =
-    findStaticMethod(
-      classOf[SessionState],
-      "setCurrentSessionState",
-      classOf[SessionState])
-  private lazy val setDataLocationMethod =
-    findMethod(
-      classOf[Table],
-      "setDataLocation",
-      classOf[Path])
-  private lazy val getAllPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "getAllPartitionsOf",
-      classOf[Table])
-  private lazy val getPartitionsByFilterMethod =
-    findMethod(
-      classOf[Hive],
-      "getPartitionsByFilter",
-      classOf[Table],
-      classOf[String])
-  private lazy val getCommandProcessorMethod =
-    findStaticMethod(
-      classOf[CommandProcessorFactory],
-      "get",
-      classOf[Array[String]],
-      classOf[HiveConf])
-  private lazy val getDriverResultsMethod =
-    findMethod(
-      classOf[Driver],
-      "getResults",
-      classOf[JList[Object]])
-
-  override def setCurrentSessionState(state: SessionState): Unit =
-    setCurrentSessionStateMethod.invoke(null, state)
-
-  override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new Path(loc))
-
-  override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
-    getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
-  /**
-   * Converts catalyst expression to the format that Hive's 
getPartitionsByFilter() expects, i.e.
-   * a string that represents partition predicates like "str_key=\"value\" and 
int_key=1 ...".
-   *
-   * Unsupported predicates are skipped.
-   */
-  def convertFilters(table: Table, filters: Seq[Expression]): String = {
-    // hive varchar is treated as catalyst string, but hive varchar can't be 
pushed down.
-    val varcharKeys = table.getPartitionKeys.asScala
-      .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) 
||
-        col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
-      .map(col => col.getName).toSet
-
-    filters.collect {
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
-        s"${a.name} ${op.symbol} $v"
-      case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
-        s"$v ${op.symbol} ${a.name}"
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
-          if !varcharKeys.contains(a.name) =>
-        s"""${a.name} ${op.symbol} "$v""""
-      case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
-          if !varcharKeys.contains(a.name) =>
-        s""""$v" ${op.symbol} ${a.name}"""
-    }.mkString(" and ")
-  }
-
-  override def getPartitionsByFilter(
-      hive: Hive,
-      table: Table,
-      predicates: Seq[Expression]): Seq[Partition] = {
-
-    // Hive getPartitionsByFilter() takes a string that represents partition
-    // predicates like "str_key=\"value\" and int_key=1 ..."
-    val filter = convertFilters(table, predicates)
-    val partitions =
-      if (filter.isEmpty) {
-        getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
-      } else {
-        logDebug(s"Hive metastore filter is '$filter'.")
-        getPartitionsByFilterMethod.invoke(hive, table, 
filter).asInstanceOf[JArrayList[Partition]]
-      }
-
-    partitions.asScala.toSeq
-  }
-
-  override def getCommandProcessor(token: String, conf: HiveConf): 
CommandProcessor =
-    getCommandProcessorMethod.invoke(null, Array(token), 
conf).asInstanceOf[CommandProcessor]
-
-  override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[Object]()
-    getDriverResultsMethod.invoke(driver, res)
-    res.asScala.map { r =>
-      r match {
-        case s: String => s
-        case a: Array[Object] => a(0).asInstanceOf[String]
-      }
-    }
-  }
-
-}
-
-private[client] class Shim_v0_14 extends Shim_v0_13 {
-
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val getTimeVarMethod =
-    findMethod(
-      classOf[HiveConf],
-      "getTimeVar",
-      classOf[HiveConf.ConfVars],
-      classOf[TimeUnit])
-
-  override def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
-      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, 
isSkewedStoreAsSubdir: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
-  }
-
-  override def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, 
JBoolean.FALSE)
-  }
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, 
JBoolean.FALSE)
-  }
-
-  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
-    getTimeVarMethod.invoke(
-      conf,
-      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
-      TimeUnit.MILLISECONDS).asInstanceOf[Long]
-  }
-
-  protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
-    val localFs = FileSystem.getLocal(conf)
-    val pathFs = FileSystem.get(path.toUri(), conf)
-    localFs.getUri() == pathFs.getUri()
-  }
-
-}
-
-private[client] class Shim_v1_0 extends Shim_v0_14 {
-
-}
-
-private[client] class Shim_v1_1 extends Shim_v1_0 {
-
-  private lazy val dropIndexMethod =
-    findMethod(
-      classOf[Hive],
-      "dropIndex",
-      classOf[String],
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-
-  override def dropIndex(hive: Hive, dbName: String, tableName: String, 
indexName: String): Unit = {
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, 
true: JBoolean)
-  }
-
-}
-
-private[client] class Shim_v1_2 extends Shim_v1_1 {
-
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JLong.TYPE)
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, 
JBoolean.FALSE,
-      0L: JLong)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/pom.xml b/spark/spark-1.6/pom.xml
deleted file mode 100644
index 98bff6b..0000000
--- a/spark/spark-1.6/pom.xml
+++ /dev/null
@@ -1,261 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>io.github.myui</groupId>
-               <artifactId>hivemall</artifactId>
-               <version>0.4.2-rc.2</version>
-               <relativePath>../../pom.xml</relativePath>
-       </parent>
-
-       <artifactId>hivemall-spark</artifactId>
-       <name>Hivemall on Spark 1.6</name>
-       <packaging>jar</packaging>
-
-       <properties>
-               <PermGen>64m</PermGen>
-               <MaxPermGen>512m</MaxPermGen>
-               <CodeCacheSize>512m</CodeCacheSize>
-               <main.basedir>${project.parent.basedir}</main.basedir>
-       </properties>
-
-       <dependencies>
-               <!-- hivemall dependencies -->
-               <dependency>
-                       <groupId>io.github.myui</groupId>
-                       <artifactId>hivemall-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>compile</scope>
-               </dependency>
-               <dependency>
-                       <groupId>io.github.myui</groupId>
-                       <artifactId>hivemall-spark-common</artifactId>
-                       <version>${project.version}</version>
-                       <scope>compile</scope>
-               </dependency>
-
-               <!-- third-party dependencies -->
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-library</artifactId>
-                       <version>${scala.version}</version>
-                       <scope>compile</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-compress</artifactId>
-                       <version>1.8</version>
-                       <scope>compile</scope>
-               </dependency>
-
-               <!-- other provided dependencies -->
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       
<artifactId>spark-core_${scala.binary.version}</artifactId>
-                       <version>${spark.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       
<artifactId>spark-sql_${scala.binary.version}</artifactId>
-                       <version>${spark.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       
<artifactId>spark-hive_${scala.binary.version}</artifactId>
-                       <version>${spark.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
-                       <version>${spark.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
-                       <version>${spark.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <!-- test dependencies -->
-               <dependency>
-                       <groupId>io.github.myui</groupId>
-                       <artifactId>hivemall-mixserv</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.xerial</groupId>
-                       <artifactId>xerial-core</artifactId>
-                       <version>3.2.3</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.scalatest</groupId>
-                       
<artifactId>scalatest_${scala.binary.version}</artifactId>
-                       <version>2.2.4</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <directory>target</directory>
-               <outputDirectory>target/classes</outputDirectory>
-               
<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
-               <testOutputDirectory>target/test-classes</testOutputDirectory>
-               <plugins>
-                       <!-- For incremental compilation -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.2.2</version>
-                               <executions>
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-                                       <execution>
-                                               
<id>scala-test-compile-first</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       
<scalaVersion>${scala.version}</scalaVersion>
-                                       
<recompileMode>incremental</recompileMode>
-                                       <useZincServer>true</useZincServer>
-                                       <args>
-                                               <arg>-unchecked</arg>
-                                               <arg>-deprecation</arg>
-                                               <!-- TODO: To enable this 
option, we need to fix many wornings -->
-                                               <!-- <arg>-feature</arg> -->
-                                       </args>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms1024m</jvmArg>
-                                               <jvmArg>-Xmx1024m</jvmArg>
-                                               
<jvmArg>-XX:PermSize=${PermGen}</jvmArg>
-                                               
<jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
-                                               
<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-                       <!-- hivemall-spark_xx-xx.jar -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <version>2.5</version>
-                               <configuration>
-                                       
<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
-                                       
<outputDirectory>${project.parent.build.directory}</outputDirectory>
-                               </configuration>
-                       </plugin>
-                       <!-- hivemall-spark_xx-xx-with-dependencies.jar 
including minimum dependencies -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <version>2.3</version>
-                               <executions>
-                                       <execution>
-                                               <id>jar-with-dependencies</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}-with-dependencies</finalName>
-                                                       
<outputDirectory>${project.parent.build.directory}</outputDirectory>
-                                                       
<minimizeJar>false</minimizeJar>
-                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
-                                                       <artifactSet>
-                                                               <includes>
-                                                                       
<include>io.github.myui:hivemall-core</include>
-                                                                       
<include>io.github.myui:hivemall-spark-common</include>
-                                                                       
<include>com.github.haifengl:smile-core</include>
-                                                                       
<include>com.github.haifengl:smile-math</include>
-                                                                       
<include>com.github.haifengl:smile-data</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!-- disable surefire because there is no java test -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <version>2.7</version>
-                               <configuration>
-                                       <skipTests>true</skipTests>
-                               </configuration>
-                       </plugin>
-                       <!-- then, enable scalatest -->
-                       <plugin>
-                               <groupId>org.scalatest</groupId>
-                               <artifactId>scalatest-maven-plugin</artifactId>
-                               <version>1.0</version>
-                               <!-- Note config is repeated in surefire config 
-->
-                               <configuration>
-                                       
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                                       <junitxml>.</junitxml>
-                                       
<filereports>SparkTestSuite.txt</filereports>
-                                       <argLine>-ea -Xmx2g 
-XX:MaxPermSize=${MaxPermGen} 
-XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
-                                       <stderr/>
-                                       <environmentVariables>
-                                               
<SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
-                                               
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
-                                               <SPARK_TESTING>1</SPARK_TESTING>
-                                               
<JAVA_HOME>${env.JAVA_HOME}</JAVA_HOME>
-                                       </environmentVariables>
-                                       <systemProperties>
-                                               
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
-                                               
<derby.system.durability>test</derby.system.durability>
-                                               
<java.awt.headless>true</java.awt.headless>
-                                               
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
-                                               <spark.testing>1</spark.testing>
-                                               
<spark.ui.enabled>false</spark.ui.enabled>
-                                               
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
-                                               
<spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
-                                               <!-- Needed by sql/hive tests. 
-->
-                                               
<test.src.tables>__not_used__</test.src.tables>
-                                       </systemProperties>
-                                       
<tagsToExclude>${test.exclude.tags}</tagsToExclude>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>test</id>
-                                               <goals>
-                                                       <goal>test</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/resources/log4j.properties 
b/spark/spark-1.6/src/main/resources/log4j.properties
deleted file mode 100644
index 72bf5b6..0000000
--- a/spark/spark-1.6/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.eclipse.jetty=INFO
-log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala 
b/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
deleted file mode 100644
index 01664f4..0000000
--- a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools
-
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.types._
-
-object RegressionDatagen {
-
-  /**
-   * Generate data for regression/classification.
-   * See [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
-   * for the details of arguments below.
-   */
-  def exec(sc: SQLContext,
-           n_partitions: Int = 2,
-           min_examples: Int = 1000,
-           n_features: Int = 10,
-           n_dims: Int = 200,
-           seed: Int = 43,
-           dense: Boolean = false,
-           prob_one: Float = 0.6f,
-           sort: Boolean = false,
-           cl: Boolean = false): DataFrame = {
-
-    require(n_partitions > 0, "Non-negative #n_partitions required.")
-    require(min_examples > 0, "Non-negative #min_examples required.")
-    require(n_features > 0, "Non-negative #n_features required.")
-    require(n_dims > 0, "Non-negative #n_dims required.")
-
-    // Calculate #examples to generate in each partition
-    val n_examples = (min_examples + n_partitions - 1) / n_partitions
-
-    val df = sc.createDataFrame(
-        sc.sparkContext.parallelize((0 until n_partitions).map(Row(_)), 
n_partitions),
-        StructType(
-          StructField("data", IntegerType, true) ::
-          Nil)
-      )
-    import sc.implicits._
-    df.lr_datagen(
-      s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims 
-prob_one $prob_one"
-        + (if (dense) " -dense" else "")
-        + (if (sort) " -sort" else "")
-        + (if (cl) " -cl" else ""))
-      .select($"label".cast(DoubleType).as("label"), $"features")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala 
b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
deleted file mode 100644
index 18ef9df..0000000
--- 
a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{AnalysisException, DataFrame, GroupedData}
-import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, 
UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Cube, Pivot, 
Rollup}
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-
-final class GroupedDataEx protected[sql](
-    df: DataFrame,
-    groupingExprs: Seq[Expression],
-    private val groupType: GroupedData.GroupType)
-  extends GroupedData(df, groupingExprs, groupType) {
-
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-  // toDF, alias, and strToExpr are copyed from the base class, GroupedData, 
because
-  // these methods have 'private[this]' modifiers.
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-
-  private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
-    val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
-      groupingExprs ++ aggExprs
-    } else {
-      aggExprs
-    }
-
-    val aliasedAgg = aggregates.map(alias)
-
-    groupType match {
-      case GroupedData.GroupByType =>
-        DataFrame(
-          df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
-      case GroupedData.RollupType =>
-        DataFrame(
-          df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aliasedAgg))
-      case GroupedData.CubeType =>
-        DataFrame(
-          df.sqlContext, Cube(groupingExprs, df.logicalPlan, aliasedAgg))
-      case GroupedData.PivotType(pivotCol, values) =>
-        val aliasedGrps = groupingExprs.map(alias)
-        DataFrame(
-          df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, 
df.logicalPlan))
-    }
-  }
-
-  private[this] def alias(expr: Expression): NamedExpression = expr match {
-    case u: UnresolvedAttribute => UnresolvedAlias(u)
-    case expr: NamedExpression => expr
-    case expr: Expression => Alias(expr, expr.prettyString)()
-  }
-
-  private[this] def strToExpr(expr: String): (Expression => Expression) = {
-    val exprToFunc: (Expression => Expression) = {
-      (inputExpr: Expression) => expr.toLowerCase match {
-        case "avg" | "average" | "mean" =>
-          UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
-        case "stddev" | "std" =>
-          UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
-        // Also special handle count because we need to take care count(*).
-        case "count" | "size" =>
-          // Turn count(*) into count(1)
-          inputExpr match {
-            case s: Star => Count(Literal(1)).toAggregateExpression()
-            case _ => Count(inputExpr).toAggregateExpression()
-          }
-        case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = 
false)
-      }
-    }
-    (inputExpr: Expression) => exprToFunc(inputExpr)
-  }
-
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-
-  // `agg` only supports aggregation functions with a single argument
-  override def agg(exprs: Map[String, String]): DataFrame = {
-    toDF(exprs.map { case (colName, expr) =>
-      val a = expr match {
-        case "voted_avg" =>
-          HiveUDAFFunction(
-            new HiveFunctionWrapper("hivemall.ensemble.bagging.VotedAvgUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression
-        case "weight_voted_avg" =>
-          HiveUDAFFunction(
-            new 
HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression()
-        case "rf_ensemble" =>
-          HiveUDAFFunction(
-            new 
HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression()
-        case _ =>
-          strToExpr(expr)(df(colName).expr)
-      }
-      Alias(a, a.prettyString)()
-    }.toSeq)
-  }
-
-  private[this] def checkType(colName: String, expected: DataType) = {
-    val dataType = df.resolve(colName).dataType
-    if (dataType != expected) {
-      throw new AnalysisException(
-        s""""$colName" must be $expected, however it is $dataType""")
-    }
-  }
-
-  /**
-   * @see hivemall.ensemble.bagging.VotedAvgUDAF
-   */
-  def voted_avg(weight: String): DataFrame = {
-    // checkType(weight, NumericType)
-    val udaf = HiveUDAFFunction(
-        new 
HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-        Seq(weight).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF
-   */
-  def weight_voted_avg(weight: String): DataFrame = {
-    // checkType(weight, NumericType)
-    val udaf = HiveUDAFFunction(
-        new 
HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-        Seq(weight).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.ArgminKLDistanceUDAF
-   */
-  def argmin_kld(weight: String, conv: String): DataFrame = {
-    // checkType(weight, NumericType)
-    // checkType(conv, NumericType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"),
-        Seq(weight, conv).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.MaxValueLabelUDAF
-   */
-  def max_label(score: String, label: String): DataFrame = {
-    // checkType(score, NumericType)
-    checkType(label, StringType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"),
-        Seq(score, label).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.MaxRowUDAF
-   */
-  def maxrow(score: String, label: String): DataFrame = {
-    // checkType(score, NumericType)
-    checkType(label, StringType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"),
-        Seq(score, label).map(df.col(_).expr),
-        isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.smile.tools.RandomForestEnsembleUDAF
-   */
-  def rf_ensemble(predict: String): DataFrame = {
-    // checkType(predict, NumericType)
-    val udaf = HiveUDAFFunction(
-        new 
HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
-        Seq(predict).map(df.col(_).expr),
-        isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.MeanAbsoluteErrorUDAF
-   */
-  def mae(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.MeanSquareErrorUDAF
-   */
-  def mse(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.RootMeanSquareErrorUDAF
-   */
-  def rmse(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.FMeasureUDAF
-   */
-  def f1score(predict: String, target: String): DataFrame = {
-    // checkType(target, ArrayType(IntegerType))
-    // checkType(predict, ArrayType(IntegerType))
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.FMeasureUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.OnehotEncodingUDAF
-   */
-  def onehot_encoding(features: String*): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"),
-      features.map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-
-  /**
-   * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
-   */
-  def snr(X: String, Y: String): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
-      Seq(X, Y).map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-
-  /**
-   * @see hivemall.tools.matrix.TransposeAndDotUDAF
-   */
-  def transpose_and_dot(X: String, Y: String): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
-      Seq(X, Y).map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-}

Reply via email to