Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/c53b9ff9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/c53b9ff9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/c53b9ff9 Branch: refs/heads/master Commit: c53b9ff9b23755c7d211390ea97bc911a80674bf Parents: 8dc3a02 Author: Takeshi Yamamuro <[email protected]> Authored: Mon Apr 10 06:43:20 2017 +0900 Committer: myui <[email protected]> Committed: Mon Apr 10 06:43:20 2017 +0900 ---------------------------------------------------------------------- .travis.yml | 3 +- NOTICE | 1 - bin/format_header.sh | 3 - pom.xml | 11 - spark/spark-1.6/bin/mvn-zinc | 99 -- spark/spark-1.6/extra-src/README | 1 - .../org/apache/spark/sql/hive/HiveShim.scala | 527 -------- spark/spark-1.6/pom.xml | 261 ---- .../src/main/resources/log4j.properties | 12 - .../hivemall/tools/RegressionDatagen.scala | 66 - .../apache/spark/sql/hive/GroupedDataEx.scala | 303 ----- .../org/apache/spark/sql/hive/HivemallOps.scala | 1125 ------------------ .../apache/spark/sql/hive/HivemallUtils.scala | 112 -- .../src/test/resources/log4j.properties | 7 - .../hivemall/mix/server/MixServerSuite.scala | 123 -- .../hivemall/tools/RegressionDatagenSuite.scala | 32 - .../scala/org/apache/spark/SparkFunSuite.scala | 48 - .../ml/feature/HivemallLabeledPointSuite.scala | 35 - .../scala/org/apache/spark/sql/QueryTest.scala | 295 ----- .../spark/sql/catalyst/plans/PlanTest.scala | 61 - .../apache/spark/sql/hive/HiveUdfSuite.scala | 113 -- .../spark/sql/hive/HivemallOpsSuite.scala | 665 ----------- .../spark/sql/hive/ModelMixingSuite.scala | 272 ----- .../spark/streaming/HivemallOpsSuite.scala | 123 -- .../apache/spark/test/HivemallQueryTest.scala | 197 --- .../scala/org/apache/spark/test/TestUtils.scala | 62 - 26 files changed, 1 insertion(+), 4556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index a4cb7ea..323e36a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,9 +35,8 @@ notifications: script: - mvn -q scalastyle:check test -Pspark-2.1 - # test the spark-1.6/2.0 modules only in the following runs + # test the spark-2.0 modules only in the following runs - mvn -q scalastyle:check clean -Pspark-2.0 -pl spark/spark-2.0 -am test -Dtest=none - - mvn -q scalastyle:check clean -Pspark-1.6 -pl spark/spark-1.6 -am test -Dtest=none after_success: - mvn clean cobertura:cobertura coveralls:report http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 0911f50..79a944c 100644 --- a/NOTICE +++ b/NOTICE @@ -50,7 +50,6 @@ o hivemall/core/src/main/java/hivemall/utils/buffer/DynamicByteArray.java https://orc.apache.org/ Licensed under the Apache License, Version 2.0 -o hivemall/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala hivemall/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/QueryTest.scala hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/bin/format_header.sh ---------------------------------------------------------------------- diff --git a/bin/format_header.sh b/bin/format_header.sh index da67420..f2c063b 100755 --- a/bin/format_header.sh +++ b/bin/format_header.sh @@ -37,8 +37,5 @@ mvn license:format cd $HIVEMALL_HOME/spark/spark-common mvn license:format -P spark-2.0 -cd $HIVEMALL_HOME/spark/spark-1.6 -mvn license:format -P spark-1.6 - cd $HIVEMALL_HOME/spark/spark-2.0 mvn license:format -P spark-2.0 http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a2276e9..7743d5a 100644 --- a/pom.xml +++ b/pom.xml @@ -293,17 +293,6 @@ </properties> </profile> <profile> - <id>spark-1.6</id> - <modules> - <module>spark/spark-1.6</module> - <module>spark/spark-common</module> - </modules> - <properties> - <spark.version>1.6.2</spark.version> - <spark.binary.version>1.6</spark.binary.version> - </properties> - </profile> - <profile> <id>compile-xgboost</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/bin/mvn-zinc ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/bin/mvn-zinc b/spark/spark-1.6/bin/mvn-zinc deleted file mode 100755 index 759b0a5..0000000 --- a/spark/spark-1.6/bin/mvn-zinc +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Copyed from commit 48682f6bf663e54cb63b7e95a4520d34b6fa890b in Apache Spark - -# Determine the current working directory -_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -# Preserve the calling directory -_CALLING_DIR="$(pwd)" -# Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - -# Installs any application tarball given a URL, the expected tarball name, -# and, optionally, a checkable binary path to determine if the binary has -# already been installed -## Arg1 - URL -## Arg2 - Tarball Name -## Arg3 - Checkable Binary -install_app() { - local remote_tarball="$1/$2" - local local_tarball="${_DIR}/$2" - local binary="${_DIR}/$3" - local curl_opts="--progress-bar -L" - local wget_opts="--progress=bar:force ${wget_opts}" - - if [ -z "$3" -o ! -f "$binary" ]; then - # check if we already have the tarball - # check if we have curl installed - # download application - [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ - echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ - curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" - # if the file still doesn't exist, lets try `wget` and cross our fingers - [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ - echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ - wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" - # if both were unsuccessful, exit - [ ! -f "${local_tarball}" ] && \ - echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ - echo "please install manually and try again." && \ - exit 2 - cd "${_DIR}" && tar -xzf "$2" - rm -rf "$local_tarball" - fi -} - -# Install zinc under the bin/ folder -install_zinc() { - local zinc_path="zinc-0.3.9/bin/zinc" - [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 - install_app \ - "http://downloads.typesafe.com/zinc/0.3.9" \ - "zinc-0.3.9.tgz" \ - "${zinc_path}" - ZINC_BIN="${_DIR}/${zinc_path}" -} - -# Setup healthy defaults for the Zinc port if none were provided from -# the environment -ZINC_PORT=${ZINC_PORT:-"3030"} - -# Install Zinc for the bin/ -install_zinc - -# Reset the current working directory -cd "${_CALLING_DIR}" - -# Now that zinc is ensured to be installed, check its status and, if its -# not running or just installed, start it -if [ ! -f "${ZINC_BIN}" ]; then - exit -1 -fi -if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} - "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - "${ZINC_BIN}" -start -port ${ZINC_PORT} &>/dev/null -fi - -# Set any `mvn` options if not already present -export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} - -# Last, call the `mvn` command as usual -mvn -DzincPort=${ZINC_PORT} "$@" http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/README ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/extra-src/README b/spark/spark-1.6/extra-src/README deleted file mode 100644 index 8b5d0cd..0000000 --- a/spark/spark-1.6/extra-src/README +++ /dev/null @@ -1 +0,0 @@ -Copyed from spark master [commit 908e37bcc10132bb2aa7f80ae694a9df6e40f31a] http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala deleted file mode 100644 index 6da54f7..0000000 --- a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.hive.client - -import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong} -import java.lang.reflect.{Method, Modifier} -import java.net.URI -import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet} -import java.util.concurrent.TimeUnit - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} -import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.serde.serdeConstants - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{IntegralType, StringType} - -/** - * A shim that defines the interface between ClientWrapper and the underlying Hive library used to - * talk to the metastore. Each Hive version has its own implementation of this class, defining - * version-specific version of needed functions. - * - * The guideline for writing shims is: - * - always extend from the previous version unless really not possible - * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to - * avoid runtime errors due to the above guideline. - */ -private[client] sealed abstract class Shim { - - /** - * Set the current SessionState to the given SessionState. Also, set the context classloader of - * the current thread to the one set in the HiveConf of this given `state`. - * @param state - */ - def setCurrentSessionState(state: SessionState): Unit - - /** - * This shim is necessary because the return type is different on different versions of Hive. - * All parameters are the same, though. - */ - def getDataLocation(table: Table): Option[String] - - def setDataLocation(table: Table, loc: String): Unit - - def getAllPartitions(hive: Hive, table: Table): Seq[Partition] - - def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] - - def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor - - def getDriverResults(driver: Driver): Seq[String] - - def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long - - def loadPartition( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit - - def loadTable( - hive: Hive, - loadPath: Path, - tableName: String, - replace: Boolean, - holdDDLTime: Boolean): Unit - - def loadDynamicPartitions( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit - - def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit - - protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { - val method = findMethod(klass, name, args: _*) - require(Modifier.isStatic(method.getModifiers()), - s"Method $name of class $klass is not static.") - method - } - - protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { - klass.getMethod(name, args: _*) - } - -} - -private[client] class Shim_v0_12 extends Shim with Logging { - - private lazy val startMethod = - findStaticMethod( - classOf[SessionState], - "start", - classOf[SessionState]) - private lazy val getDataLocationMethod = findMethod(classOf[Table], "getDataLocation") - private lazy val setDataLocationMethod = - findMethod( - classOf[Table], - "setDataLocation", - classOf[URI]) - private lazy val getAllPartitionsMethod = - findMethod( - classOf[Hive], - "getAllPartitionsForPruner", - classOf[Table]) - private lazy val getCommandProcessorMethod = - findStaticMethod( - classOf[CommandProcessorFactory], - "get", - classOf[String], - classOf[HiveConf]) - private lazy val getDriverResultsMethod = - findMethod( - classOf[Driver], - "getResults", - classOf[JArrayList[String]]) - private lazy val loadPartitionMethod = - findMethod( - classOf[Hive], - "loadPartition", - classOf[Path], - classOf[String], - classOf[JMap[String, String]], - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val loadTableMethod = - findMethod( - classOf[Hive], - "loadTable", - classOf[Path], - classOf[String], - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val loadDynamicPartitionsMethod = - findMethod( - classOf[Hive], - "loadDynamicPartitions", - classOf[Path], - classOf[String], - classOf[JMap[String, String]], - JBoolean.TYPE, - JInteger.TYPE, - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val dropIndexMethod = - findMethod( - classOf[Hive], - "dropIndex", - classOf[String], - classOf[String], - classOf[String], - JBoolean.TYPE) - - override def setCurrentSessionState(state: SessionState): Unit = { - // Starting from Hive 0.13, setCurrentSessionState will internally override - // the context class loader of the current thread by the class loader set in - // the conf of the SessionState. So, for this Hive 0.12 shim, we add the same - // behavior and make shim.setCurrentSessionState of all Hive versions have the - // consistent behavior. - Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader) - startMethod.invoke(null, state) - } - - override def getDataLocation(table: Table): Option[String] = - Option(getDataLocationMethod.invoke(table)).map(_.toString()) - - override def setDataLocation(table: Table, loc: String): Unit = - setDataLocationMethod.invoke(table, new URI(loc)) - - override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq - - override def getPartitionsByFilter( - hive: Hive, - table: Table, - predicates: Seq[Expression]): Seq[Partition] = { - // getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12. - // See HIVE-4888. - logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " + - "Please use Hive 0.13 or higher.") - getAllPartitions(hive, table) - } - - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] - - override def getDriverResults(driver: Driver): Seq[String] = { - val res = new JArrayList[String]() - getDriverResultsMethod.invoke(driver, res) - res.asScala - } - - override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { - conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000 - } - - override def loadPartition( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { - loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) - } - - override def loadTable( - hive: Hive, - loadPath: Path, - tableName: String, - replace: Boolean, - holdDDLTime: Boolean): Unit = { - loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean) - } - - override def loadDynamicPartitions( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit = { - loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean) - } - - override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { - dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean) - } - -} - -private[client] class Shim_v0_13 extends Shim_v0_12 { - - private lazy val setCurrentSessionStateMethod = - findStaticMethod( - classOf[SessionState], - "setCurrentSessionState", - classOf[SessionState]) - private lazy val setDataLocationMethod = - findMethod( - classOf[Table], - "setDataLocation", - classOf[Path]) - private lazy val getAllPartitionsMethod = - findMethod( - classOf[Hive], - "getAllPartitionsOf", - classOf[Table]) - private lazy val getPartitionsByFilterMethod = - findMethod( - classOf[Hive], - "getPartitionsByFilter", - classOf[Table], - classOf[String]) - private lazy val getCommandProcessorMethod = - findStaticMethod( - classOf[CommandProcessorFactory], - "get", - classOf[Array[String]], - classOf[HiveConf]) - private lazy val getDriverResultsMethod = - findMethod( - classOf[Driver], - "getResults", - classOf[JList[Object]]) - - override def setCurrentSessionState(state: SessionState): Unit = - setCurrentSessionStateMethod.invoke(null, state) - - override def setDataLocation(table: Table, loc: String): Unit = - setDataLocationMethod.invoke(table, new Path(loc)) - - override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq - - /** - * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e. - * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...". - * - * Unsupported predicates are skipped. - */ - def convertFilters(table: Table, filters: Seq[Expression]): String = { - // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - val varcharKeys = table.getPartitionKeys.asScala - .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || - col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) - .map(col => col.getName).toSet - - filters.collect { - case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => - s"${a.name} ${op.symbol} $v" - case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) => - s"$v ${op.symbol} ${a.name}" - case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType)) - if !varcharKeys.contains(a.name) => - s"""${a.name} ${op.symbol} "$v"""" - case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) - if !varcharKeys.contains(a.name) => - s""""$v" ${op.symbol} ${a.name}""" - }.mkString(" and ") - } - - override def getPartitionsByFilter( - hive: Hive, - table: Table, - predicates: Seq[Expression]): Seq[Partition] = { - - // Hive getPartitionsByFilter() takes a string that represents partition - // predicates like "str_key=\"value\" and int_key=1 ..." - val filter = convertFilters(table, predicates) - val partitions = - if (filter.isEmpty) { - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] - } else { - logDebug(s"Hive metastore filter is '$filter'.") - getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] - } - - partitions.asScala.toSeq - } - - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor] - - override def getDriverResults(driver: Driver): Seq[String] = { - val res = new JArrayList[Object]() - getDriverResultsMethod.invoke(driver, res) - res.asScala.map { r => - r match { - case s: String => s - case a: Array[Object] => a(0).asInstanceOf[String] - } - } - } - -} - -private[client] class Shim_v0_14 extends Shim_v0_13 { - - private lazy val loadPartitionMethod = - findMethod( - classOf[Hive], - "loadPartition", - classOf[Path], - classOf[String], - classOf[JMap[String, String]], - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val loadTableMethod = - findMethod( - classOf[Hive], - "loadTable", - classOf[Path], - classOf[String], - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val loadDynamicPartitionsMethod = - findMethod( - classOf[Hive], - "loadDynamicPartitions", - classOf[Path], - classOf[String], - classOf[JMap[String, String]], - JBoolean.TYPE, - JInteger.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE) - private lazy val getTimeVarMethod = - findMethod( - classOf[HiveConf], - "getTimeVar", - classOf[HiveConf.ConfVars], - classOf[TimeUnit]) - - override def loadPartition( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { - loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, - isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE) - } - - override def loadTable( - hive: Hive, - loadPath: Path, - tableName: String, - replace: Boolean, - holdDDLTime: Boolean): Unit = { - loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean, - isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE) - } - - override def loadDynamicPartitions( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit = { - loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE) - } - - override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { - getTimeVarMethod.invoke( - conf, - HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, - TimeUnit.MILLISECONDS).asInstanceOf[Long] - } - - protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = { - val localFs = FileSystem.getLocal(conf) - val pathFs = FileSystem.get(path.toUri(), conf) - localFs.getUri() == pathFs.getUri() - } - -} - -private[client] class Shim_v1_0 extends Shim_v0_14 { - -} - -private[client] class Shim_v1_1 extends Shim_v1_0 { - - private lazy val dropIndexMethod = - findMethod( - classOf[Hive], - "dropIndex", - classOf[String], - classOf[String], - classOf[String], - JBoolean.TYPE, - JBoolean.TYPE) - - override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { - dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean) - } - -} - -private[client] class Shim_v1_2 extends Shim_v1_1 { - - private lazy val loadDynamicPartitionsMethod = - findMethod( - classOf[Hive], - "loadDynamicPartitions", - classOf[Path], - classOf[String], - classOf[JMap[String, String]], - JBoolean.TYPE, - JInteger.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JBoolean.TYPE, - JLong.TYPE) - - override def loadDynamicPartitions( - hive: Hive, - loadPath: Path, - tableName: String, - partSpec: JMap[String, String], - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit = { - loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE, - 0L: JLong) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/pom.xml b/spark/spark-1.6/pom.xml deleted file mode 100644 index 98bff6b..0000000 --- a/spark/spark-1.6/pom.xml +++ /dev/null @@ -1,261 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>io.github.myui</groupId> - <artifactId>hivemall</artifactId> - <version>0.4.2-rc.2</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>hivemall-spark</artifactId> - <name>Hivemall on Spark 1.6</name> - <packaging>jar</packaging> - - <properties> - <PermGen>64m</PermGen> - <MaxPermGen>512m</MaxPermGen> - <CodeCacheSize>512m</CodeCacheSize> - <main.basedir>${project.parent.basedir}</main.basedir> - </properties> - - <dependencies> - <!-- hivemall dependencies --> - <dependency> - <groupId>io.github.myui</groupId> - <artifactId>hivemall-core</artifactId> - <version>${project.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>io.github.myui</groupId> - <artifactId>hivemall-spark-common</artifactId> - <version>${project.version}</version> - <scope>compile</scope> - </dependency> - - <!-- third-party dependencies --> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - <version>1.8</version> - <scope>compile</scope> - </dependency> - - <!-- other provided dependencies --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>io.github.myui</groupId> - <artifactId>hivemall-mixserv</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.xerial</groupId> - <artifactId>xerial-core</artifactId> - <version>3.2.3</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <version>2.2.4</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <directory>target</directory> - <outputDirectory>target/classes</outputDirectory> - <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName> - <testOutputDirectory>target/test-classes</testOutputDirectory> - <plugins> - <!-- For incremental compilation --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.2.2</version> - <executions> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${scala.version}</scalaVersion> - <recompileMode>incremental</recompileMode> - <useZincServer>true</useZincServer> - <args> - <arg>-unchecked</arg> - <arg>-deprecation</arg> - <!-- TODO: To enable this option, we need to fix many wornings --> - <!-- <arg>-feature</arg> --> - </args> - <jvmArgs> - <jvmArg>-Xms1024m</jvmArg> - <jvmArg>-Xmx1024m</jvmArg> - <jvmArg>-XX:PermSize=${PermGen}</jvmArg> - <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg> - <jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg> - </jvmArgs> - </configuration> - </plugin> - <!-- hivemall-spark_xx-xx.jar --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.5</version> - <configuration> - <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName> - <outputDirectory>${project.parent.build.directory}</outputDirectory> - </configuration> - </plugin> - <!-- hivemall-spark_xx-xx-with-dependencies.jar including minimum dependencies --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - <executions> - <execution> - <id>jar-with-dependencies</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}-with-dependencies</finalName> - <outputDirectory>${project.parent.build.directory}</outputDirectory> - <minimizeJar>false</minimizeJar> - <createDependencyReducedPom>false</createDependencyReducedPom> - <artifactSet> - <includes> - <include>io.github.myui:hivemall-core</include> - <include>io.github.myui:hivemall-spark-common</include> - <include>com.github.haifengl:smile-core</include> - <include>com.github.haifengl:smile-math</include> - <include>com.github.haifengl:smile-data</include> - </includes> - </artifactSet> - </configuration> - </execution> - </executions> - </plugin> - <!-- disable surefire because there is no java test --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.7</version> - <configuration> - <skipTests>true</skipTests> - </configuration> - </plugin> - <!-- then, enable scalatest --> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <!-- Note config is repeated in surefire config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>SparkTestSuite.txt</filereports> - <argLine>-ea -Xmx2g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine> - <stderr/> - <environmentVariables> - <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES> - <SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION> - <SPARK_TESTING>1</SPARK_TESTING> - <JAVA_HOME>${env.JAVA_HOME}</JAVA_HOME> - </environmentVariables> - <systemProperties> - <log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration> - <derby.system.durability>test</derby.system.durability> - <java.awt.headless>true</java.awt.headless> - <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir> - <spark.testing>1</spark.testing> - <spark.ui.enabled>false</spark.ui.enabled> - <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress> - <spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak> - <!-- Needed by sql/hive tests. --> - <test.src.tables>__not_used__</test.src.tables> - </systemProperties> - <tagsToExclude>${test.exclude.tags}</tagsToExclude> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/resources/log4j.properties b/spark/spark-1.6/src/main/resources/log4j.properties deleted file mode 100644 index 72bf5b6..0000000 --- a/spark/spark-1.6/src/main/resources/log4j.properties +++ /dev/null @@ -1,12 +0,0 @@ -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.eclipse.jetty=INFO -log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala b/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala deleted file mode 100644 index 01664f4..0000000 --- a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package hivemall.tools - -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.hive.HivemallUtils._ -import org.apache.spark.sql.types._ - -object RegressionDatagen { - - /** - * Generate data for regression/classification. - * See [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]] - * for the details of arguments below. - */ - def exec(sc: SQLContext, - n_partitions: Int = 2, - min_examples: Int = 1000, - n_features: Int = 10, - n_dims: Int = 200, - seed: Int = 43, - dense: Boolean = false, - prob_one: Float = 0.6f, - sort: Boolean = false, - cl: Boolean = false): DataFrame = { - - require(n_partitions > 0, "Non-negative #n_partitions required.") - require(min_examples > 0, "Non-negative #min_examples required.") - require(n_features > 0, "Non-negative #n_features required.") - require(n_dims > 0, "Non-negative #n_dims required.") - - // Calculate #examples to generate in each partition - val n_examples = (min_examples + n_partitions - 1) / n_partitions - - val df = sc.createDataFrame( - sc.sparkContext.parallelize((0 until n_partitions).map(Row(_)), n_partitions), - StructType( - StructField("data", IntegerType, true) :: - Nil) - ) - import sc.implicits._ - df.lr_datagen( - s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one" - + (if (dense) " -dense" else "") - + (if (sort) " -sort" else "") - + (if (cl) " -cl" else "")) - .select($"label".cast(DoubleType).as("label"), $"features") - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala deleted file mode 100644 index 18ef9df..0000000 --- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.spark.sql.{AnalysisException, DataFrame, GroupedData} -import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Cube, Pivot, Rollup} -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.types._ - -final class GroupedDataEx protected[sql]( - df: DataFrame, - groupingExprs: Seq[Expression], - private val groupType: GroupedData.GroupType) - extends GroupedData(df, groupingExprs, groupType) { - - ////////////////////////////////////////////////////////////////////////////////////////////// - // toDF, alias, and strToExpr are copyed from the base class, GroupedData, because - // these methods have 'private[this]' modifiers. - ////////////////////////////////////////////////////////////////////////////////////////////// - - private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { - val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { - groupingExprs ++ aggExprs - } else { - aggExprs - } - - val aliasedAgg = aggregates.map(alias) - - groupType match { - case GroupedData.GroupByType => - DataFrame( - df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) - case GroupedData.RollupType => - DataFrame( - df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aliasedAgg)) - case GroupedData.CubeType => - DataFrame( - df.sqlContext, Cube(groupingExprs, df.logicalPlan, aliasedAgg)) - case GroupedData.PivotType(pivotCol, values) => - val aliasedGrps = groupingExprs.map(alias) - DataFrame( - df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) - } - } - - private[this] def alias(expr: Expression): NamedExpression = expr match { - case u: UnresolvedAttribute => UnresolvedAlias(u) - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyString)() - } - - private[this] def strToExpr(expr: String): (Expression => Expression) = { - val exprToFunc: (Expression => Expression) = { - (inputExpr: Expression) => expr.toLowerCase match { - case "avg" | "average" | "mean" => - UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false) - case "stddev" | "std" => - UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false) - // Also special handle count because we need to take care count(*). - case "count" | "size" => - // Turn count(*) into count(1) - inputExpr match { - case s: Star => Count(Literal(1)).toAggregateExpression() - case _ => Count(inputExpr).toAggregateExpression() - } - case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = false) - } - } - (inputExpr: Expression) => exprToFunc(inputExpr) - } - - ////////////////////////////////////////////////////////////////////////////////////////////// - ////////////////////////////////////////////////////////////////////////////////////////////// - - // `agg` only supports aggregation functions with a single argument - override def agg(exprs: Map[String, String]): DataFrame = { - toDF(exprs.map { case (colName, expr) => - val a = expr match { - case "voted_avg" => - HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.bagging.VotedAvgUDAF"), - Seq(df.col(colName).expr), - isUDAFBridgeRequired = true - ).toAggregateExpression - case "weight_voted_avg" => - HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(df.col(colName).expr), - isUDAFBridgeRequired = true - ).toAggregateExpression() - case "rf_ensemble" => - HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"), - Seq(df.col(colName).expr), - isUDAFBridgeRequired = true - ).toAggregateExpression() - case _ => - strToExpr(expr)(df(colName).expr) - } - Alias(a, a.prettyString)() - }.toSeq) - } - - private[this] def checkType(colName: String, expected: DataType) = { - val dataType = df.resolve(colName).dataType - if (dataType != expected) { - throw new AnalysisException( - s""""$colName" must be $expected, however it is $dataType""") - } - } - - /** - * @see hivemall.ensemble.bagging.VotedAvgUDAF - */ - def voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF - */ - def weight_voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.ArgminKLDistanceUDAF - */ - def argmin_kld(weight: String, conv: String): DataFrame = { - // checkType(weight, NumericType) - // checkType(conv, NumericType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"), - Seq(weight, conv).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.MaxValueLabelUDAF - */ - def max_label(score: String, label: String): DataFrame = { - // checkType(score, NumericType) - checkType(label, StringType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"), - Seq(score, label).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.MaxRowUDAF - */ - def maxrow(score: String, label: String): DataFrame = { - // checkType(score, NumericType) - checkType(label, StringType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"), - Seq(score, label).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.smile.tools.RandomForestEnsembleUDAF - */ - def rf_ensemble(predict: String): DataFrame = { - // checkType(predict, NumericType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"), - Seq(predict).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.MeanAbsoluteErrorUDAF - */ - def mae(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.MeanSquareErrorUDAF - */ - def mse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.RootMeanSquareErrorUDAF - */ - def rmse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.FMeasureUDAF - */ - def f1score(predict: String, target: String): DataFrame = { - // checkType(target, ArrayType(IntegerType)) - // checkType(predict, ArrayType(IntegerType)) - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.evaluation.FMeasureUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) - } - - /** - * @see hivemall.ftvec.trans.OnehotEncodingUDAF - */ - def onehot_encoding(features: String*): DataFrame = { - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"), - features.map(df.col(_).expr), - isUDAFBridgeRequired = false) - toDF(Seq(Alias(udaf, udaf.prettyString)())) - } - - /** - * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF - */ - def snr(X: String, Y: String): DataFrame = { - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), - Seq(X, Y).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyString)())) - } - - /** - * @see hivemall.tools.matrix.TransposeAndDotUDAF - */ - def transpose_and_dot(X: String, Y: String): DataFrame = { - val udaf = HiveUDAFFunction( - new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), - Seq(X, Y).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyString)())) - } -}
