[HIVEMALL-180][SPARK] Drop the Spark-2.0 support ## What changes were proposed in this pull request? This pr dropped the module for Spark-2.0.
## What type of PR is it? Improvement ## What is the Jira issue? https://issues.apache.org/jira/browse/HIVEMALL-180 ## How was this patch tested? Existing tests Author: Takeshi Yamamuro <[email protected]> Closes #138 from maropu/HIVEMALL-180. Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/cd24be89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/cd24be89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/cd24be89 Branch: refs/heads/master Commit: cd24be895cce02ade27b17830f8217f0a5fa0746 Parents: 629222d Author: Takeshi Yamamuro <[email protected]> Authored: Mon Apr 2 14:19:30 2018 +0900 Committer: Makoto Yui <[email protected]> Committed: Mon Apr 2 14:19:30 2018 +0900 ---------------------------------------------------------------------- bin/run_travis_tests.sh | 2 +- spark/pom.xml | 1 - spark/spark-2.0/bin/mvn-zinc | 99 -- spark/spark-2.0/extra-src/README | 1 - .../org/apache/spark/sql/hive/HiveShim.scala | 280 ---- spark/spark-2.0/pom.xml | 138 -- .../java/hivemall/xgboost/XGBoostOptions.scala | 58 - ....apache.spark.sql.sources.DataSourceRegister | 1 - .../src/main/resources/log4j.properties | 29 - .../hivemall/tools/RegressionDatagen.scala | 66 - .../sql/catalyst/expressions/EachTopK.scala | 116 -- .../spark/sql/hive/HivemallGroupedDataset.scala | 303 ---- .../org/apache/spark/sql/hive/HivemallOps.scala | 1381 ------------------ .../apache/spark/sql/hive/HivemallUtils.scala | 145 -- .../sql/hive/internal/HivemallOpsImpl.scala | 78 - .../sql/hive/source/XGBoostFileFormat.scala | 146 -- .../spark/streaming/HivemallStreamingOps.scala | 47 - .../src/test/resources/data/files/README.md | 22 - .../src/test/resources/data/files/complex.seq | 0 .../src/test/resources/data/files/episodes.avro | 0 .../src/test/resources/data/files/json.txt | 0 .../src/test/resources/data/files/kv1.txt | 0 .../src/test/resources/data/files/kv3.txt | 0 .../src/test/resources/log4j.properties | 24 - .../hivemall/mix/server/MixServerSuite.scala | 125 -- .../hivemall/tools/RegressionDatagenSuite.scala | 32 - .../scala/org/apache/spark/SparkFunSuite.scala | 50 - .../ml/feature/HivemallLabeledPointSuite.scala | 35 - .../scala/org/apache/spark/sql/QueryTest.scala | 475 ------ .../spark/sql/catalyst/plans/PlanTest.scala | 61 - .../apache/spark/sql/hive/HiveUdfSuite.scala | 160 -- .../spark/sql/hive/HivemallOpsSuite.scala | 811 ---------- .../spark/sql/hive/ModelMixingSuite.scala | 285 ---- .../apache/spark/sql/hive/XGBoostSuite.scala | 143 -- .../sql/hive/benchmark/MiscBenchmark.scala | 224 --- .../hive/test/HivemallFeatureQueryTest.scala | 112 -- .../spark/sql/hive/test/TestHiveSingleton.scala | 37 - .../streaming/HivemallOpsWithFeatureSuite.scala | 154 -- .../scala/org/apache/spark/test/TestUtils.scala | 64 - .../org/apache/spark/test/VectorQueryTest.scala | 89 -- 40 files changed, 1 insertion(+), 5793 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/bin/run_travis_tests.sh ---------------------------------------------------------------------- diff --git a/bin/run_travis_tests.sh b/bin/run_travis_tests.sh index 9693520..b9e0763 100755 --- a/bin/run_travis_tests.sh +++ b/bin/run_travis_tests.sh @@ -35,7 +35,7 @@ cd $HIVEMALL_HOME/spark export MAVEN_OPTS="-XX:MaxPermSize=256m" -mvn -q scalastyle:check -pl spark-2.0,spark-2.1 -am test +mvn -q scalastyle:check -pl spark-2.1 -am test # spark-2.2 runs on Java 8+ if [[ ! -z "$(java -version 2>&1 | grep 1.8)" ]]; then http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 27bb6db..99553f9 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -32,7 +32,6 @@ <modules> <module>common</module> - <module>spark-2.0</module> <module>spark-2.1</module> <module>spark-2.2</module> <module>spark-2.3</module> http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/bin/mvn-zinc ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/bin/mvn-zinc b/spark/spark-2.0/bin/mvn-zinc deleted file mode 100755 index 759b0a5..0000000 --- a/spark/spark-2.0/bin/mvn-zinc +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Copyed from commit 48682f6bf663e54cb63b7e95a4520d34b6fa890b in Apache Spark - -# Determine the current working directory -_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -# Preserve the calling directory -_CALLING_DIR="$(pwd)" -# Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - -# Installs any application tarball given a URL, the expected tarball name, -# and, optionally, a checkable binary path to determine if the binary has -# already been installed -## Arg1 - URL -## Arg2 - Tarball Name -## Arg3 - Checkable Binary -install_app() { - local remote_tarball="$1/$2" - local local_tarball="${_DIR}/$2" - local binary="${_DIR}/$3" - local curl_opts="--progress-bar -L" - local wget_opts="--progress=bar:force ${wget_opts}" - - if [ -z "$3" -o ! -f "$binary" ]; then - # check if we already have the tarball - # check if we have curl installed - # download application - [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ - echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ - curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" - # if the file still doesn't exist, lets try `wget` and cross our fingers - [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ - echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ - wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" - # if both were unsuccessful, exit - [ ! -f "${local_tarball}" ] && \ - echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ - echo "please install manually and try again." && \ - exit 2 - cd "${_DIR}" && tar -xzf "$2" - rm -rf "$local_tarball" - fi -} - -# Install zinc under the bin/ folder -install_zinc() { - local zinc_path="zinc-0.3.9/bin/zinc" - [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 - install_app \ - "http://downloads.typesafe.com/zinc/0.3.9" \ - "zinc-0.3.9.tgz" \ - "${zinc_path}" - ZINC_BIN="${_DIR}/${zinc_path}" -} - -# Setup healthy defaults for the Zinc port if none were provided from -# the environment -ZINC_PORT=${ZINC_PORT:-"3030"} - -# Install Zinc for the bin/ -install_zinc - -# Reset the current working directory -cd "${_CALLING_DIR}" - -# Now that zinc is ensured to be installed, check its status and, if its -# not running or just installed, start it -if [ ! -f "${ZINC_BIN}" ]; then - exit -1 -fi -if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} - "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - "${ZINC_BIN}" -start -port ${ZINC_PORT} &>/dev/null -fi - -# Set any `mvn` options if not already present -export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} - -# Last, call the `mvn` command as usual -mvn -DzincPort=${ZINC_PORT} "$@" http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/extra-src/README ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/extra-src/README b/spark/spark-2.0/extra-src/README deleted file mode 100644 index 8b5d0cd..0000000 --- a/spark/spark-2.0/extra-src/README +++ /dev/null @@ -1 +0,0 @@ -Copyed from spark master [commit 908e37bcc10132bb2aa7f80ae694a9df6e40f31a] http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala deleted file mode 100644 index a695841..0000000 --- a/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.hive - -import java.io.{InputStream, OutputStream} -import java.rmi.server.UID - -import scala.collection.JavaConverters._ -import scala.language.implicitConversions -import scala.reflect.ClassTag - -import com.google.common.base.Objects -import org.apache.avro.Schema -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils -import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils} -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector -import org.apache.hadoop.io.Writable -import org.apache.hive.com.esotericsoftware.kryo.Kryo -import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output} - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.types.Decimal -import org.apache.spark.util.Utils - -private[hive] object HiveShim { - // Precision and scale to pass for unlimited decimals; these are the same as the precision and - // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs) - val UNLIMITED_DECIMAL_PRECISION = 38 - val UNLIMITED_DECIMAL_SCALE = 18 - val HIVE_GENERIC_UDF_MACRO_CLS = "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro" - - /* - * This function in hive-0.13 become private, but we have to do this to walkaround hive bug - */ - private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) { - val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "") - val result: StringBuilder = new StringBuilder(old) - var first: Boolean = old.isEmpty - - for (col <- cols) { - if (first) { - first = false - } else { - result.append(',') - } - result.append(col) - } - conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString) - } - - /* - * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty - */ - def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { - if (ids != null && ids.nonEmpty) { - ColumnProjectionUtils.appendReadColumns(conf, ids.asJava) - } - if (names != null && names.nonEmpty) { - appendReadColumnNames(conf, names) - } - } - - /* - * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that - * is needed to initialize before serialization. - */ - def prepareWritable(w: Writable, serDeProps: Seq[(String, String)]): Writable = { - w match { - case w: AvroGenericRecordWritable => - w.setRecordReaderID(new UID()) - // In Hive 1.1, the record's schema may need to be initialized manually or a NPE will - // be thrown. - if (w.getFileSchema() == null) { - serDeProps - .find(_._1 == AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()) - .foreach { kv => - w.setFileSchema(new Schema.Parser().parse(kv._2)) - } - } - case _ => - } - w - } - - def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = { - if (hdoi.preferWritable()) { - Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue, - hdoi.precision(), hdoi.scale()) - } else { - Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale()) - } - } - - /** - * This class provides the UDF creation and also the UDF instance serialization and - * de-serialization cross process boundary. - * - * Detail discussion can be found at https://github.com/apache/spark/pull/3640 - * - * @param functionClassName UDF class name - * @param instance optional UDF instance which contains additional information (for macro) - */ - private[hive] case class HiveFunctionWrapper(var functionClassName: String, - private var instance: AnyRef = null) extends java.io.Externalizable { - - // for Serialization - def this() = this(null) - - override def hashCode(): Int = { - if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) { - Objects.hashCode(functionClassName, instance.asInstanceOf[GenericUDFMacro].getBody()) - } else { - functionClassName.hashCode() - } - } - - override def equals(other: Any): Boolean = other match { - case a: HiveFunctionWrapper if functionClassName == a.functionClassName => - // In case of udf macro, check to make sure they point to the same underlying UDF - if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) { - a.instance.asInstanceOf[GenericUDFMacro].getBody() == - instance.asInstanceOf[GenericUDFMacro].getBody() - } else { - true - } - case _ => false - } - - @transient - def deserializeObjectByKryo[T: ClassTag]( - kryo: Kryo, - in: InputStream, - clazz: Class[_]): T = { - val inp = new Input(in) - val t: T = kryo.readObject(inp, clazz).asInstanceOf[T] - inp.close() - t - } - - @transient - def serializeObjectByKryo( - kryo: Kryo, - plan: Object, - out: OutputStream) { - val output: Output = new Output(out) - kryo.writeObject(output, plan) - output.close() - } - - def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { - deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz) - .asInstanceOf[UDFType] - } - - def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { - serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out) - } - - def writeExternal(out: java.io.ObjectOutput) { - // output the function name - out.writeUTF(functionClassName) - - // Write a flag if instance is null or not - out.writeBoolean(instance != null) - if (instance != null) { - // Some of the UDF are serializable, but some others are not - // Hive Utilities can handle both cases - val baos = new java.io.ByteArrayOutputStream() - serializePlan(instance, baos) - val functionInBytes = baos.toByteArray - - // output the function bytes - out.writeInt(functionInBytes.length) - out.write(functionInBytes, 0, functionInBytes.length) - } - } - - def readExternal(in: java.io.ObjectInput) { - // read the function name - functionClassName = in.readUTF() - - if (in.readBoolean()) { - // if the instance is not null - // read the function in bytes - val functionInBytesLength = in.readInt() - val functionInBytes = new Array[Byte](functionInBytesLength) - in.readFully(functionInBytes) - - // deserialize the function object via Hive Utilities - instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes), - Utils.getContextOrSparkClassLoader.loadClass(functionClassName)) - } - } - - def createFunction[UDFType <: AnyRef](): UDFType = { - if (instance != null) { - instance.asInstanceOf[UDFType] - } else { - val func = Utils.getContextOrSparkClassLoader - .loadClass(functionClassName).newInstance.asInstanceOf[UDFType] - if (!func.isInstanceOf[UDF]) { - // We cache the function if it's no the Simple UDF, - // as we always have to create new instance for Simple UDF - instance = func - } - func - } - } - } - - /* - * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. - * Fix it through wrapper. - */ - implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { - val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) - f.setCompressCodec(w.compressCodec) - f.setCompressType(w.compressType) - f.setTableInfo(w.tableInfo) - f.setDestTableId(w.destTableId) - f - } - - /* - * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. - * Fix it through wrapper. - */ - private[hive] class ShimFileSinkDesc( - var dir: String, - var tableInfo: TableDesc, - var compressed: Boolean) - extends Serializable with Logging { - var compressCodec: String = _ - var compressType: String = _ - var destTableId: Int = _ - - def setCompressed(compressed: Boolean) { - this.compressed = compressed - } - - def getDirName(): String = dir - - def setDestTableId(destTableId: Int) { - this.destTableId = destTableId - } - - def setTableInfo(tableInfo: TableDesc) { - this.tableInfo = tableInfo - } - - def setCompressCodec(intermediateCompressorCodec: String) { - compressCodec = intermediateCompressorCodec - } - - def setCompressType(intermediateCompressType: String) { - compressType = intermediateCompressType - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/pom.xml b/spark/spark-2.0/pom.xml deleted file mode 100644 index 54c817d..0000000 --- a/spark/spark-2.0/pom.xml +++ /dev/null @@ -1,138 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.hivemall</groupId> - <artifactId>hivemall-spark</artifactId> - <version>0.5.1-incubating-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>hivemall-spark2.0</artifactId> - <name>Hivemall on Spark 2.0</name> - <packaging>jar</packaging> - - <properties> - <main.basedir>${project.parent.parent.basedir}</main.basedir> - <spark.version>2.0.2</spark.version> - <spark.binary.version>2.0</spark.binary.version> - </properties> - - <dependencies> - <!-- compile scope --> - <dependency> - <groupId>org.apache.hivemall</groupId> - <artifactId>hivemall-core</artifactId> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.hivemall</groupId> - <artifactId>hivemall-xgboost</artifactId> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.hivemall</groupId> - <artifactId>hivemall-spark-common</artifactId> - <version>${project.version}</version> - <scope>compile</scope> - </dependency> - - <!-- provided scope --> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>org.apache.hivemall</groupId> - <artifactId>hivemall-mixserv</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <!-- hivemall-spark_xx-xx-with-dependencies.jar including minimum dependencies --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - </plugin> - <!-- disable surefire because there is no java test --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skipTests>true</skipTests> - </configuration> - </plugin> - <!-- then, enable scalatest --> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/java/hivemall/xgboost/XGBoostOptions.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/java/hivemall/xgboost/XGBoostOptions.scala b/spark/spark-2.0/src/main/java/hivemall/xgboost/XGBoostOptions.scala deleted file mode 100644 index 48a773b..0000000 --- a/spark/spark-2.0/src/main/java/hivemall/xgboost/XGBoostOptions.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package hivemall.xgboost - -import scala.collection.mutable - -import org.apache.commons.cli.Options -import org.apache.spark.annotation.AlphaComponent - -/** - * :: AlphaComponent :: - * An utility class to generate a sequence of options used in XGBoost. - */ -@AlphaComponent -case class XGBoostOptions() { - private val params: mutable.Map[String, String] = mutable.Map.empty - private val options: Options = { - new XGBoostUDTF() { - def options(): Options = super.getOptions() - }.options() - } - - private def isValidKey(key: String): Boolean = { - // TODO: Is there another way to handle all the XGBoost options? - options.hasOption(key) || key == "num_class" - } - - def set(key: String, value: String): XGBoostOptions = { - require(isValidKey(key), s"non-existing key detected in XGBoost options: ${key}") - params.put(key, value) - this - } - - def help(): Unit = { - import scala.collection.JavaConversions._ - options.getOptions.map { case option => println(option) } - } - - override def toString(): String = { - params.map { case (key, value) => s"-$key $value" }.mkString(" ") - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/spark-2.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister deleted file mode 100644 index b49e20a..0000000 --- a/spark/spark-2.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.sql.hive.source.XGBoostFileFormat http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/resources/log4j.properties b/spark/spark-2.0/src/main/resources/log4j.properties deleted file mode 100644 index ef4f606..0000000 --- a/spark/spark-2.0/src/main/resources/log4j.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.eclipse.jetty=INFO -log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala b/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala deleted file mode 100644 index 72a5c83..0000000 --- a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package hivemall.tools - -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.types._ - -object RegressionDatagen { - - /** - * Generate data for regression/classification. - * See [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]] - * for the details of arguments below. - */ - def exec(sc: SQLContext, - n_partitions: Int = 2, - min_examples: Int = 1000, - n_features: Int = 10, - n_dims: Int = 200, - seed: Int = 43, - dense: Boolean = false, - prob_one: Float = 0.6f, - sort: Boolean = false, - cl: Boolean = false): DataFrame = { - - require(n_partitions > 0, "Non-negative #n_partitions required.") - require(min_examples > 0, "Non-negative #min_examples required.") - require(n_features > 0, "Non-negative #n_features required.") - require(n_dims > 0, "Non-negative #n_dims required.") - - // Calculate #examples to generate in each partition - val n_examples = (min_examples + n_partitions - 1) / n_partitions - - val df = sc.createDataFrame( - sc.sparkContext.parallelize((0 until n_partitions).map(Row(_)), n_partitions), - StructType( - StructField("data", IntegerType, true) :: - Nil) - ) - import sc.implicits._ - df.lr_datagen( - lit(s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one" - + (if (dense) " -dense" else "") - + (if (sort) " -sort" else "") - + (if (cl) " -cl" else "")) - ).select($"label".cast(DoubleType).as("label"), $"features") - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/scala/org/apache/spark/sql/catalyst/expressions/EachTopK.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/catalyst/expressions/EachTopK.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/catalyst/expressions/EachTopK.scala deleted file mode 100644 index f1312ba..0000000 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/catalyst/expressions/EachTopK.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.catalyst.expressions - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.util.BoundedPriorityQueue - -case class EachTopK( - k: Int, - groupingExpression: Expression, - scoreExpression: Expression, - children: Seq[Attribute]) extends Generator with CodegenFallback { - type QueueType = (AnyRef, InternalRow) - - require(k != 0, "`k` must not have 0") - - private[this] lazy val scoreType = scoreExpression.dataType - private[this] lazy val scoreOrdering = { - val ordering = TypeUtils.getInterpretedOrdering(scoreType) - .asInstanceOf[Ordering[AnyRef]] - if (k > 0) { - ordering - } else { - ordering.reverse - } - } - private[this] lazy val reverseScoreOrdering = scoreOrdering.reverse - - private[this] val queue: BoundedPriorityQueue[QueueType] = { - new BoundedPriorityQueue(Math.abs(k))(new Ordering[QueueType] { - override def compare(x: QueueType, y: QueueType): Int = - scoreOrdering.compare(x._1, y._1) - }) - } - - lazy private[this] val groupingProjection: UnsafeProjection = - UnsafeProjection.create(groupingExpression :: Nil, children) - - lazy private[this] val scoreProjection: UnsafeProjection = - UnsafeProjection.create(scoreExpression :: Nil, children) - - // The grouping key of the current partition - private[this] var currentGroupingKey: UnsafeRow = _ - - override def checkInputDataTypes(): TypeCheckResult = { - if (!TypeCollection.Ordered.acceptsType(scoreExpression.dataType)) { - TypeCheckResult.TypeCheckFailure( - s"$scoreExpression must have a comparable type") - } else { - TypeCheckResult.TypeCheckSuccess - } - } - - override def elementSchema: StructType = - StructType( - Seq(StructField("rank", IntegerType)) ++ - children.map(d => StructField(d.prettyName, d.dataType)) - ) - - private def topKRowsForGroup(): Seq[InternalRow] = if (queue.size > 0) { - val outputRows = queue.iterator.toSeq.reverse - val (headScore, _) = outputRows.head - val rankNum = outputRows.scanLeft((1, headScore)) { case ((rank, prevScore), (score, _)) => - if (prevScore == score) (rank, score) else (rank + 1, score) - } - outputRows.zip(rankNum.map(_._1)).map { case ((_, row), index) => - new JoinedRow(InternalRow(index), row) - } - } else { - Seq.empty - } - - override def eval(input: InternalRow): TraversableOnce[InternalRow] = { - val groupingKey = groupingProjection(input) - val ret = if (currentGroupingKey != groupingKey) { - val topKRows = topKRowsForGroup() - currentGroupingKey = groupingKey.copy() - queue.clear() - topKRows - } else { - Iterator.empty - } - queue += Tuple2(scoreProjection(input).get(0, scoreType), input.copy()) - ret - } - - override def terminate(): TraversableOnce[InternalRow] = { - if (queue.size > 0) { - val topKRows = topKRowsForGroup() - queue.clear() - topKRows - } else { - Iterator.empty - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala deleted file mode 100644 index 7d2cd83..0000000 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.RelationalGroupedDataset -import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.Aggregate -import org.apache.spark.sql.catalyst.plans.logical.Pivot -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.types._ - -/** - * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. - * - * @groupname ensemble - * @groupname ftvec.trans - * @groupname evaluation - */ -final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { - - /** - * @see hivemall.ensemble.bagging.VotedAvgUDAF - * @group ensemble - */ - def voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) - val udaf = HiveUDAFFunction( - "voted_avg", - new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF - * @group ensemble - */ - def weight_voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) - val udaf = HiveUDAFFunction( - "weight_voted_avg", - new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.ArgminKLDistanceUDAF - * @group ensemble - */ - def argmin_kld(weight: String, conv: String): DataFrame = { - // checkType(weight, NumericType) - // checkType(conv, NumericType) - val udaf = HiveUDAFFunction( - "argmin_kld", - new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"), - Seq(weight, conv).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.MaxValueLabelUDAF" - * @group ensemble - */ - def max_label(score: String, label: String): DataFrame = { - // checkType(score, NumericType) - checkType(label, StringType) - val udaf = HiveUDAFFunction( - "max_label", - new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"), - Seq(score, label).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.ensemble.MaxRowUDAF - * @group ensemble - */ - def maxrow(score: String, label: String): DataFrame = { - // checkType(score, NumericType) - checkType(label, StringType) - val udaf = HiveUDAFFunction( - "maxrow", - new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"), - Seq(score, label).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.smile.tools.RandomForestEnsembleUDAF - * @group ensemble - */ - def rf_ensemble(predict: String): DataFrame = { - // checkType(predict, NumericType) - val udaf = HiveUDAFFunction( - "rf_ensemble", - new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"), - Seq(predict).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.tools.matrix.TransposeAndDotUDAF - */ - def transpose_and_dot(X: String, Y: String): DataFrame = { - val udaf = HiveUDAFFunction( - "transpose_and_dot", - new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), - Seq(X, Y).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) - } - - /** - * @see hivemall.ftvec.trans.OnehotEncodingUDAF - * @group ftvec.trans - */ - def onehot_encoding(cols: String*): DataFrame = { - val udaf = HiveUDAFFunction( - "onehot_encoding", - new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"), - cols.map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) - } - - /** - * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF - */ - def snr(X: String, Y: String): DataFrame = { - val udaf = HiveUDAFFunction( - "snr", - new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), - Seq(X, Y).map(df.col(_).expr), - isUDAFBridgeRequired = false) - .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) - } - - /** - * @see hivemall.evaluation.MeanAbsoluteErrorUDAF - * @group evaluation - */ - def mae(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - "mae", - new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.MeanSquareErrorUDAF - * @group evaluation - */ - def mse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - "mse", - new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.RootMeanSquareErrorUDAF - * @group evaluation - */ - def rmse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) - val udaf = HiveUDAFFunction( - "rmse", - new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * @see hivemall.evaluation.F1ScoreUDAF - * @group evaluation - */ - def f1score(predict: String, target: String): DataFrame = { - // checkType(target, ArrayType(IntegerType)) - // checkType(predict, ArrayType(IntegerType)) - val udaf = HiveUDAFFunction( - "f1score", - new HiveFunctionWrapper("hivemall.evaluation.F1ScoreUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) - .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) - } - - /** - * [[RelationalGroupedDataset]] has the three values as private fields, so, to inject Hivemall - * aggregate functions, we fetch them via Java Reflections. - */ - private val df = getPrivateField[DataFrame]("org$apache$spark$sql$RelationalGroupedDataset$$df") - private val groupingExprs = getPrivateField[Seq[Expression]]("groupingExprs") - private val groupType = getPrivateField[RelationalGroupedDataset.GroupType]("groupType") - - private def getPrivateField[T](name: String): T = { - val field = groupBy.getClass.getDeclaredField(name) - field.setAccessible(true) - field.get(groupBy).asInstanceOf[T] - } - - private def toDF(aggExprs: Seq[Expression]): DataFrame = { - val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { - groupingExprs ++ aggExprs - } else { - aggExprs - } - - val aliasedAgg = aggregates.map(alias) - - groupType match { - case RelationalGroupedDataset.GroupByType => - Dataset.ofRows( - df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.RollupType => - Dataset.ofRows( - df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.CubeType => - Dataset.ofRows( - df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.PivotType(pivotCol, values) => - val aliasedGrps = groupingExprs.map(alias) - Dataset.ofRows( - df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) - } - } - - private def alias(expr: Expression): NamedExpression = expr match { - case u: UnresolvedAttribute => UnresolvedAlias(u) - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyName)() - } - - private def checkType(colName: String, expected: DataType) = { - val dataType = df.resolve(colName).dataType - if (dataType != expected) { - throw new AnalysisException( - s""""$colName" must be $expected, however it is $dataType""") - } - } -} - -object HivemallGroupedDataset { - - /** - * Implicitly inject the [[HivemallGroupedDataset]] into [[RelationalGroupedDataset]]. - */ - implicit def relationalGroupedDatasetToHivemallOne( - groupBy: RelationalGroupedDataset): HivemallGroupedDataset = { - new HivemallGroupedDataset(groupBy) - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala deleted file mode 100644 index b5299ef..0000000 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ /dev/null @@ -1,1381 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.hive - -import java.util.UUID - -import org.apache.spark.annotation.Experimental -import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.HivemallFeature -import org.apache.spark.ml.linalg.{DenseVector, SparseVector, VectorUDT} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, Literal, NamedExpression, UserDefinedGenerator} -import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * Hivemall wrapper and some utility functions for DataFrame. - * - * @groupname regression - * @groupname classifier - * @groupname classifier.multiclass - * @groupname xgboost - * @groupname anomaly - * @groupname knn.similarity - * @groupname knn.distance - * @groupname knn.lsh - * @groupname ftvec - * @groupname ftvec.amplify - * @groupname ftvec.hashing - * @groupname ftvec.scaling - * @groupname ftvec.conv - * @groupname ftvec.trans - * @groupname misc - */ -final class HivemallOps(df: DataFrame) extends Logging { - import internal.HivemallOpsImpl._ - - private[this] val _sparkSession = df.sparkSession - private[this] val _analyzer = _sparkSession.sessionState.analyzer - - /** - * @see [[hivemall.regression.AdaDeltaUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_adadelta(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.AdaDeltaUDTF", - "train_adadelta", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.AdaGradUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_adagrad(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.AdaGradUDTF", - "train_adagrad", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.AROWRegressionUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.AROWRegressionUDTF", - "train_arow_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]] - * @group regression - */ - @scala.annotation.varargs - def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.AROWRegressionUDTF$AROWe", - "train_arowe_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]] - * @group regression - */ - @scala.annotation.varargs - def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.AROWRegressionUDTF$AROWe2", - "train_arowe2_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.regression.LogressUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_logregr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.LogressUDTF", - "train_logregr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.PassiveAggressiveRegressionUDTF", - "train_pa1_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]] - * @group regression - */ - @scala.annotation.varargs - def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a", - "train_pa1a_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]] - * @group regression - */ - @scala.annotation.varargs - def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2", - "train_pa2_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]] - * @group regression - */ - @scala.annotation.varargs - def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a", - "train_pa2a_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]] - * @group regression - */ - @scala.annotation.varargs - def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.smile.regression.RandomForestRegressionUDTF", - "train_randomforest_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") - ) - } - - /** - * @see [[hivemall.classifier.PerceptronUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_perceptron(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.PerceptronUDTF", - "train_perceptron", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.PassiveAggressiveUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_pa(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.PassiveAggressiveUDTF", - "train_pa", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]] - * @group classifier - */ - @scala.annotation.varargs - def train_pa1(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.PassiveAggressiveUDTF$PA1", - "train_pa1", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]] - * @group classifier - */ - @scala.annotation.varargs - def train_pa2(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.PassiveAggressiveUDTF$PA2", - "train_pa2", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.ConfidenceWeightedUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_cw(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.ConfidenceWeightedUDTF", - "train_cw", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.AROWClassifierUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_arow(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.AROWClassifierUDTF", - "train_arow", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]] - * @group classifier - */ - @scala.annotation.varargs - def train_arowh(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.AROWClassifierUDTF$AROWh", - "train_arowh", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] - * @group classifier - */ - @scala.annotation.varargs - def train_scw(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1", - "train_scw", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] - * @group classifier - */ - @scala.annotation.varargs - def train_scw2(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2", - "train_scw2", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.AdaGradRDAUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.AdaGradRDAUDTF", - "train_adagrad_rda", - setMixServs(toHivemallFeatures(exprs)), - Seq("feature", "weight") - ) - } - - /** - * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]] - * @group classifier - */ - @scala.annotation.varargs - def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.smile.classification.RandomForestClassifierUDTF", - "train_randomforest_classifier", - setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassPerceptronUDTF", - "train_multiclass_perceptron", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF", - "train_multiclass_pa", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1", - "train_multiclass_pa1", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2", - "train_multiclass_pa2", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF", - "train_multiclass_cw", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF", - "train_multiclass_arow", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1", - "train_multiclass_scw", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight", "conv") - ) - } - - /** - * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]] - * @group classifier.multiclass - */ - @scala.annotation.varargs - def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2", - "train_multiclass_scw2", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "feature", "weight", "conv") - ) - } - - /** - * :: Experimental :: - * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]] - * @group xgboost - */ - @Experimental - @scala.annotation.varargs - def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.xgboost.regression.XGBoostRegressionUDTF", - "train_xgboost_regr", - setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "pred_model") - ) - } - - /** - * :: Experimental :: - * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]] - * @group xgboost - */ - @Experimental - @scala.annotation.varargs - def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF", - "train_xgboost_classifier", - setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "pred_model") - ) - } - - /** - * :: Experimental :: - * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]] - * @group xgboost - */ - @Experimental - @scala.annotation.varargs - def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF", - "train_xgboost_multiclass_classifier", - setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "pred_model") - ) - } - - /** - * :: Experimental :: - * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]] - * @group xgboost - */ - @Experimental - @scala.annotation.varargs - def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.xgboost.tools.XGBoostPredictUDTF", - "xgboost_predict", - setMixServs(toHivemallFeatures(exprs)), - Seq("rowid", "predicted") - ) - } - - /** - * :: Experimental :: - * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]] - * @group xgboost - */ - @Experimental - @scala.annotation.varargs - def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF", - "xgboost_multiclass_predict", - setMixServs(toHivemallFeatures(exprs)), - Seq("rowid", "label", "probability") - ) - } - - /** - * @see [[hivemall.knn.lsh.MinHashUDTF]] - * @group knn.lsh - */ - @scala.annotation.varargs - def minhash(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.knn.lsh.MinHashUDTF", - "minhash", - setMixServs(toHivemallFeatures(exprs)), - Seq("clusterid", "item") - ) - } - - /** - * @see [[hivemall.ftvec.amplify.AmplifierUDTF]] - * @group ftvec.amplify - */ - @scala.annotation.varargs - def amplify(exprs: Column*): DataFrame = withTypedPlan { - val outputAttr = exprs.drop(1).map { - case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name) - case Column(expr: Expression) => UnresolvedAttribute(expr.simpleString) - } - planHiveGenericUDTF( - df, - "hivemall.ftvec.amplify.AmplifierUDTF", - "amplify", - setMixServs(toHivemallFeatures(exprs)), - Seq("clusterid", "item") - ) - } - - /** - * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]] - * @group ftvec.amplify - */ - @scala.annotation.varargs - def rand_amplify(exprs: Column*): DataFrame = withTypedPlan { - throw new UnsupportedOperationException("`rand_amplify` not supported yet") - } - - /** - * Amplifies and shuffle data inside partitions. - * @group ftvec.amplify - */ - def part_amplify(xtimes: Column): DataFrame = { - val xtimesInt = xtimes.expr match { - case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] - case e => throw new AnalysisException("`xtimes` must be integer, however " + e) - } - val rdd = df.rdd.mapPartitions({ iter => - val elems = iter.flatMap{ row => - Seq.fill[Row](xtimesInt)(row) - } - // Need to check how this shuffling affects results - scala.util.Random.shuffle(elems) - }, true) - df.sqlContext.createDataFrame(rdd, df.schema) - } - - /** - * Quantifies input columns. - * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]] - * @group ftvec.conv - */ - @scala.annotation.varargs - def quantify(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.ftvec.conv.QuantifyColumnsUDTF", - "quantify", - setMixServs(toHivemallFeatures(exprs)), - (0 until exprs.size - 1).map(i => s"c$i") - ) - } - - /** - * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def binarize_label(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.ftvec.trans.BinarizeLabelUDTF", - "binarize_label", - setMixServs(toHivemallFeatures(exprs)), - (0 until exprs.size - 1).map(i => s"c$i") - ) - } - - /** - * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def quantified_features(exprs: Column*): DataFrame = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.ftvec.trans.QuantifiedFeaturesUDTF", - "quantified_features", - setMixServs(toHivemallFeatures(exprs)), - Seq("features") - ) - } - - /** - * Splits Seq[String] into pieces. - * @group ftvec - */ - def explode_array(expr: Column): DataFrame = { - df.explode(expr) { case Row(v: Seq[_]) => - // Type erasure removes the component type in Seq - v.map(s => HivemallFeature(s.asInstanceOf[String])) - } - } - - /** - * Splits [[Vector]] into pieces. - * @group ftvec - */ - def explode_vector(expr: Column): DataFrame = { - val elementSchema = StructType( - StructField("feature", StringType) :: StructField("weight", DoubleType) :: Nil) - val explodeFunc: Row => TraversableOnce[InternalRow] = (row: Row) => { - row.get(0) match { - case dv: DenseVector => - dv.values.zipWithIndex.map { - case (value, index) => - InternalRow(UTF8String.fromString(s"$index"), value) - } - case sv: SparseVector => - sv.values.zip(sv.indices).map { - case (value, index) => - InternalRow(UTF8String.fromString(s"$index"), value) - } - } - } - withTypedPlan { - Generate( - UserDefinedGenerator(elementSchema, explodeFunc, expr.expr :: Nil), - join = true, outer = false, None, - generatorOutput = Nil, - df.logicalPlan) - } - } - - /** - * Returns `top-k` records for each `group`. - * @group misc - */ - def each_top_k(k: Column, group: Column, score: Column): DataFrame = withTypedPlan { - val kInt = k.expr match { - case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] - case e => throw new AnalysisException("`k` must be integer, however " + e) - } - val clusterDf = df.repartition(group).sortWithinPartitions(group) - val child = clusterDf.logicalPlan - val logicalPlan = Project(group.named +: score.named +: child.output, child) - _analyzer.execute(logicalPlan) match { - case Project(group :: score :: origCols, c) => - Generate( - EachTopK(kInt, group, score, c.output), - join = false, outer = false, None, - (Seq("rank") ++ origCols.map(_.name)).map(UnresolvedAttribute(_)), - clusterDf.logicalPlan - ) - } - } - - /** - * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]] - * @group misc - */ - @scala.annotation.varargs - def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan { - planHiveGenericUDTF( - df, - "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper", - "lr_datagen", - setMixServs(toHivemallFeatures(exprs)), - Seq("label", "features") - ) - } - - /** - * Returns all the columns as Seq[Column] in this [[DataFrame]]. - */ - private[sql] def cols: Seq[Column] = { - df.schema.fields.map(col => df.col(col.name)).toSeq - } - - /** - * :: Experimental :: - * If a parameter '-mix' does not exist in a 3rd argument, - * set it from an environmental variable - * 'HIVEMALL_MIX_SERVERS'. - * - * TODO: This could work if '--deploy-mode' has 'client'; - * otherwise, we need to set HIVEMALL_MIX_SERVERS - * in all possible spark workers. - */ - @Experimental - private[this] def setMixServs(exprs: Seq[Column]): Seq[Column] = { - val mixes = System.getenv("HIVEMALL_MIX_SERVERS") - if (mixes != null && !mixes.isEmpty()) { - val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID - logInfo(s"set '${mixes}' as default mix servers (session: ${groupId})") - exprs.size match { - case 2 => exprs :+ Column( - Literal.create(s"-mix ${mixes} -mix_session ${groupId}", StringType)) - /** TODO: Add codes in the case where exprs.size == 3. */ - case _ => exprs - } - } else { - exprs - } - } - - /** - * If the input is a [[Vector]], transform it into Hivemall features. - */ - @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = { - df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map { - case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c) - case (_, c) => c - } - } - - /** - * A convenient function to wrap a logical plan and produce a DataFrame. - */ - @inline private[this] def withTypedPlan(logicalPlan: => LogicalPlan): DataFrame = { - val queryExecution = df.sparkSession.sessionState.executePlan(logicalPlan) - val outputSchema = queryExecution.sparkPlan.schema - new Dataset[Row](df.sparkSession, queryExecution, RowEncoder(outputSchema)) - } -} - -object HivemallOps { - import internal.HivemallOpsImpl._ - - /** - * Implicitly inject the [[HivemallOps]] into [[DataFrame]]. - */ - implicit def dataFrameToHivemallOps(df: DataFrame): HivemallOps = - new HivemallOps(df) - - /** - * @see [[hivemall.HivemallVersionUDF]] - * @group misc - */ - def hivemall_version(): Column = withExpr { - planHiveUDF( - "hivemall.HivemallVersionUDF", - "hivemall_version", - Nil - ) - } - - /** - * @see [[hivemall.anomaly.ChangeFinderUDF]] - * @group anomaly - */ - @scala.annotation.varargs - def changefinder(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.anomaly.ChangeFinderUDF", - "changefinder", - exprs - ) - } - - /** - * @see [[hivemall.anomaly.SingularSpectrumTransformUDF]] - * @group anomaly - */ - @scala.annotation.varargs - def sst(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.anomaly.SingularSpectrumTransformUDF", - "sst", - exprs - ) - } - - /** - * @see [[hivemall.knn.similarity.CosineSimilarityUDF]] - * @group knn.similarity - */ - @scala.annotation.varargs - def cosine_sim(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.similarity.CosineSimilarityUDF", - "cosine_sim", - exprs - ) - } - - /** - * @see [[hivemall.knn.similarity.JaccardIndexUDF]] - * @group knn.similarity - */ - @scala.annotation.varargs - def jaccard(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.knn.similarity.JaccardIndexUDF", - "jaccard", - exprs - ) - } - - /** - * @see [[hivemall.knn.similarity.AngularSimilarityUDF]] - * @group knn.similarity - */ - @scala.annotation.varargs - def angular_similarity(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.similarity.AngularSimilarityUDF", - "angular_similarity", - exprs - ) - } - - /** - * @see [[hivemall.knn.similarity.EuclidSimilarity]] - * @group knn.similarity - */ - @scala.annotation.varargs - def euclid_similarity(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.similarity.EuclidSimilarity", - "euclid_similarity", - exprs - ) - } - - /** - * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]] - * @group knn.similarity - */ - @scala.annotation.varargs - def distance2similarity(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveGenericUDF( - "hivemall.knn.similarity.Distance2SimilarityUDF", - "distance2similarity", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.HammingDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def hamming_distance(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.knn.distance.HammingDistanceUDF", - "hamming_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.PopcountUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def popcnt(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.knn.distance.PopcountUDF", - "popcnt", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.KLDivergenceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def kld(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.knn.distance.KLDivergenceUDF", - "kld", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.EuclidDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def euclid_distance(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.distance.EuclidDistanceUDF", - "euclid_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.CosineDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def cosine_distance(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.distance.CosineDistanceUDF", - "cosine_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.AngularDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def angular_distance(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.distance.AngularDistanceUDF", - "angular_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.ManhattanDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def manhattan_distance(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.distance.ManhattanDistanceUDF", - "manhattan_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]] - * @group knn.distance - */ - @scala.annotation.varargs - def minkowski_distance (exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.distance.MinkowskiDistanceUDF", - "minkowski_distance", - exprs - ) - } - - /** - * @see [[hivemall.knn.lsh.bBitMinHashUDF]] - * @group knn.lsh - */ - @scala.annotation.varargs - def bbit_minhash(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.knn.lsh.bBitMinHashUDF", - "bbit_minhash", - exprs - ) - } - - /** - * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]] - * @group knn.lsh - */ - @scala.annotation.varargs - def minhashes(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.knn.lsh.MinHashesUDFWrapper", - "minhashes", - exprs - ) - } - - /** - * Returns new features with `1.0` (bias) appended to the input features. - * @see [[hivemall.ftvec.AddBiasUDFWrapper]] - * @group ftvec - */ - def add_bias(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.AddBiasUDFWrapper", - "add_bias", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]] - * @group ftvec - * - * TODO: This throws java.lang.ClassCastException because - * HiveInspectors.toInspector has a bug in spark. - * Need to fix it later. - */ - def extract_feature(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.ExtractFeatureUDFWrapper", - "extract_feature", - expr :: Nil - ) - }.as("feature") - - /** - * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]] - * @group ftvec - * - * TODO: This throws java.lang.ClassCastException because - * HiveInspectors.toInspector has a bug in spark. - * Need to fix it later. - */ - def extract_weight(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.ExtractWeightUDFWrapper", - "extract_weight", - expr :: Nil - ) - }.as("value") - - /** - * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]] - * @group ftvec - */ - def add_feature_index(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.AddFeatureIndexUDFWrapper", - "add_feature_index", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]] - * @group ftvec - */ - def sort_by_feature(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.SortByFeatureUDFWrapper", - "sort_by_feature", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]] - * @group ftvec.hashing - */ - def mhash(expr: Column): Column = withExpr { - planHiveUDF( - "hivemall.ftvec.hashing.MurmurHash3UDF", - "mhash", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.hashing.Sha1UDF]] - * @group ftvec.hashing - */ - def sha1(expr: Column): Column = withExpr { - planHiveUDF( - "hivemall.ftvec.hashing.Sha1UDF", - "sha1", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]] - * @group ftvec.hashing - */ - @scala.annotation.varargs - def array_hash_values(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveUDF( - "hivemall.ftvec.hashing.ArrayHashValuesUDF", - "array_hash_values", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]] - * @group ftvec.hashing - */ - @scala.annotation.varargs - def prefixed_hash_values(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveUDF( - "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF", - "prefixed_hash_values", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.scaling.RescaleUDF]] - * @group ftvec.scaling - */ - def rescale(value: Column, max: Column, min: Column): Column = withExpr { - planHiveUDF( - "hivemall.ftvec.scaling.RescaleUDF", - "rescale", - value.cast(FloatType) :: max :: min :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.scaling.ZScoreUDF]] - * @group ftvec.scaling - */ - @scala.annotation.varargs - def zscore(exprs: Column*): Column = withExpr { - planHiveUDF( - "hivemall.ftvec.scaling.ZScoreUDF", - "zscore", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]] - * @group ftvec.scaling - */ - def normalize(expr: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.scaling.L2NormalizationUDFWrapper", - "normalize", - expr :: Nil - ) - } - - /** - * @see [[hivemall.ftvec.selection.ChiSquareUDF]] - * @group ftvec.selection - */ - def chi2(observed: Column, expected: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.selection.ChiSquareUDF", - "chi2", - Seq(observed, expected) - ) - } - - /** - * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]] - * @group ftvec.conv - */ - @scala.annotation.varargs - def to_dense_features(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveGenericUDF( - "hivemall.ftvec.conv.ToDenseFeaturesUDF", - "to_dense_features", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]] - * @group ftvec.conv - */ - @scala.annotation.varargs - def to_sparse_features(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveGenericUDF( - "hivemall.ftvec.conv.ToSparseFeaturesUDF", - "to_sparse_features", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def vectorize_features(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.trans.VectorizeFeaturesUDF", - "vectorize_features", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def categorical_features(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.trans.CategoricalFeaturesUDF", - "categorical_features", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.trans.IndexedFeatures]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def indexed_features(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.trans.IndexedFeatures", - "indexed_features", - exprs - ) - } - - /** - * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]] - * @group ftvec.trans - */ - @scala.annotation.varargs - def quantitative_features(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.ftvec.trans.QuantitativeFeaturesUDF", - "quantitative_features", - exprs - ) - } - - /** - * @see [[hivemall.smile.tools.TreePredictUDF]] - * @group misc - */ - @scala.annotation.varargs - def tree_predict(exprs: Column*): Column = withExpr { - planHiveGenericUDF( - "hivemall.smile.tools.TreePredictUDF", - "tree_predict", - exprs - ) - } - - /** - * @see [[hivemall.tools.array.SelectKBestUDF]] - * @group tools.array - */ - def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr { - planHiveGenericUDF( - "hivemall.tools.array.SelectKBestUDF", - "select_k_best", - Seq(X, importanceList, k) - ) - } - - /** - * @see [[hivemall.tools.math.SigmoidGenericUDF]] - * @group misc - */ - def sigmoid(expr: Column): Column = { - val one: () => Literal = () => Literal.create(1.0, DoubleType) - Column(one()) / (Column(one()) + exp(-expr)) - } - - /** - * @see [[hivemall.tools.mapred.RowIdUDFWrapper]] - * @group misc - */ - def rowid(): Column = withExpr { - planHiveGenericUDF( - "hivemall.tools.mapred.RowIdUDFWrapper", - "rowid", - Nil - ) - }.as("rowid") - - /** - * A convenient function to wrap an expression and produce a Column. - */ - @inline private def withExpr(expr: Expression): Column = Column(expr) -}
