spark git commit: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelectCommand
Repository: spark Updated Branches: refs/heads/master 93f35569f -> 7d19b6ab7 [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelectCommand ## What changes were proposed in this pull request? The `CreateDataSourceTableAsSelectCommand` is quite complex now, as it has a lot of work to do if the table already exists: 1. throw exception if we don't want to ignore it. 2. do some check and adjust the schema if we want to append data. 3. drop the table and create it again if we want to overwrite. The work 2 and 3 should be done by analyzer, so that we can also apply it to hive tables. ## How was this patch tested? existing tests. Author: Wenchen FanCloses #15996 from cloud-fan/append. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d19b6ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d19b6ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d19b6ab Branch: refs/heads/master Commit: 7d19b6ab7d75b95d9eb1c7e1f228d23fd482306e Parents: 93f3556 Author: Wenchen Fan Authored: Wed Dec 28 21:50:21 2016 -0800 Committer: Yin Huai Committed: Wed Dec 28 21:50:21 2016 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 78 + .../command/createDataSourceTables.scala| 167 +-- .../spark/sql/execution/datasources/rules.scala | 164 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 2 +- 4 files changed, 213 insertions(+), 198 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d19b6ab/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9c5660a..405f38a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,11 +23,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType /** @@ -364,7 +365,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } -val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) +val catalog = df.sparkSession.sessionState.catalog +val tableExists = catalog.tableExists(tableIdent) +val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) +val tableIdentWithDB = tableIdent.copy(database = Some(db)) +val tableName = tableIdentWithDB.unquotedString (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -373,39 +378,48 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table $tableIdent already exists.") - case _ => -val existingTable = if (tableExists) { - Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) -} else { - None + case (true, SaveMode.Overwrite) => +// Get all input data source relations of the query. +val srcRelations = df.logicalPlan.collect { + case LogicalRelation(src: BaseRelation, _, _) => src } -val storage = if (tableExists) { - existingTable.get.storage -} else { - DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -} -val tableType = if (tableExists) { - existingTable.get.tableType -} else if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED +EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { + // Only do the check if the table is a data source table (the relation is a BaseRelation). + case LogicalRelation(dest: BaseRelation, _, _) if
[06/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/sql-programming-guide.html -- diff --git a/site/docs/2.1.0/sql-programming-guide.html b/site/docs/2.1.0/sql-programming-guide.html index 17f5981..4534a98 100644 --- a/site/docs/2.1.0/sql-programming-guide.html +++ b/site/docs/2.1.0/sql-programming-guide.html @@ -127,95 +127,95 @@ - Overview - SQL - Datasets and DataFrames + Overview + SQL + Datasets and DataFrames - Getting Started - Starting Point: SparkSession - Creating DataFrames - Untyped Dataset Operations (aka DataFrame Operations) - Running SQL Queries Programmatically - Global Temporary View - Creating Datasets - Interoperating with RDDs - Inferring the Schema Using Reflection - Programmatically Specifying the Schema + Getting Started + Starting Point: SparkSession + Creating DataFrames + Untyped Dataset Operations (aka DataFrame Operations) + Running SQL Queries Programmatically + Global Temporary View + Creating Datasets + Interoperating with RDDs + Inferring the Schema Using Reflection + Programmatically Specifying the Schema - Data Sources - Generic Load/Save Functions - Manually Specifying Options - Run SQL on files directly - Save Modes - Saving to Persistent Tables + Data Sources + Generic Load/Save Functions + Manually Specifying Options + Run SQL on files directly + Save Modes + Saving to Persistent Tables - Parquet Files - Loading Data Programmatically - Partition Discovery - Schema Merging - Hive metastore Parquet table conversion - Hive/Parquet Schema Reconciliation - Metadata Refreshing + Parquet Files + Loading Data Programmatically + Partition Discovery + Schema Merging + Hive metastore Parquet table conversion + Hive/Parquet Schema Reconciliation + Metadata Refreshing - Configuration + Configuration - JSON Datasets - Hive Tables - Interacting with Different Versions of Hive Metastore + JSON Datasets + Hive Tables + Interacting with Different Versions of Hive Metastore - JDBC To Other Databases - Troubleshooting + JDBC To Other Databases + Troubleshooting - Performance Tuning - Caching Data In Memory - Other Configuration Options + Performance Tuning + Caching Data In Memory + Other Configuration Options - Distributed SQL Engine - Running the Thrift JDBC/ODBC server - Running the Spark SQL CLI + Distributed SQL Engine + Running the Thrift JDBC/ODBC server + Running the Spark SQL CLI - Migration Guide - Upgrading From Spark SQL 2.0 to 2.1 - Upgrading From Spark SQL 1.6 to 2.0 - Upgrading From Spark SQL 1.5 to 1.6 - Upgrading From Spark SQL 1.4 to 1.5 - Upgrading from Spark SQL 1.3 to 1.4 - DataFrame data reader/writer interface - DataFrame.groupBy retains grouping columns - Behavior change on DataFrame.withColumn + Migration Guide + Upgrading From Spark SQL 2.0 to 2.1 + Upgrading From Spark SQL 1.6 to 2.0 + Upgrading From Spark SQL 1.5 to 1.6 + Upgrading From Spark SQL 1.4 to 1.5 + Upgrading from Spark SQL 1.3 to 1.4 + DataFrame data reader/writer interface + DataFrame.groupBy retains grouping columns + Behavior change on DataFrame.withColumn - Upgrading from Spark SQL 1.0-1.2 to 1.3 - Rename of SchemaRDD to DataFrame - Unification of the Java and Scala APIs - Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) - Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) - UDF Registration Moved to sqlContext.udf (Java Scala) - Python DataTypes No Longer Singletons + Upgrading from Spark SQL 1.0-1.2 to 1.3 + Rename of SchemaRDD to DataFrame + Unification of the Java and Scala APIs + Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) + Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) + UDF Registration Moved to sqlContext.udf (Java Scala) + Python DataTypes No Longer
[19/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-migration-guides.html -- diff --git a/site/docs/2.1.0/ml-migration-guides.html b/site/docs/2.1.0/ml-migration-guides.html index 5e8a913..24dfc31 100644 --- a/site/docs/2.1.0/ml-migration-guides.html +++ b/site/docs/2.1.0/ml-migration-guides.html @@ -344,21 +344,21 @@ for converting to mllib.linalg types. -import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.util.MLUtils // convert DataFrame columns val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) // convert a single vector or matrix val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML -val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML +val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML Refer to the MLUtils Scala docs for further detail. -import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.sql.Dataset; // convert DataFrame columns @@ -366,21 +366,21 @@ for converting to mllib.linalg types. DatasetRow convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF); // convert a single vector or matrix org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML(); -org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML(); +org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML(); Refer to the MLUtils Java docs for further detail. -from pyspark.mllib.util import MLUtils +from pyspark.mllib.util import MLUtils -# convert DataFrame columns +# convert DataFrame columns convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) -# convert a single vector or matrix +# convert a single vector or matrix mlVec = mllibVec.asML() -mlMat = mllibMat.asML() +mlMat = mllibMat.asML() Refer to the MLUtils Python docs for further detail. http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-pipeline.html -- diff --git a/site/docs/2.1.0/ml-pipeline.html b/site/docs/2.1.0/ml-pipeline.html index fe17564..b57afde 100644 --- a/site/docs/2.1.0/ml-pipeline.html +++ b/site/docs/2.1.0/ml-pipeline.html @@ -331,27 +331,27 @@ machine learning pipelines. Table of Contents - Main concepts in Pipelines - DataFrame - Pipeline components - Transformers - Estimators - Properties of pipeline components + Main concepts in Pipelines + DataFrame + Pipeline components + Transformers + Estimators + Properties of pipeline components - Pipeline - How it works - Details + Pipeline + How it works + Details - Parameters - Saving and Loading Pipelines + Parameters + Saving and Loading Pipelines - Code examples - Example: Estimator, Transformer, and Param - Example: Pipeline - Model selection (hyperparameter tuning) + Code examples + Example: Estimator, Transformer, and Param + Example: Pipeline + Model selection (hyperparameter tuning) @@ -541,7 +541,7 @@ Refer to the [`Estimator` Scala docs](api/scala/index.html#org.apache.spark.ml.E the [`Transformer` Scala docs](api/scala/index.html#org.apache.spark.ml.Transformer) and the [`Params` Scala docs](api/scala/index.html#org.apache.spark.ml.param.Params) for details on the API. -import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row @@ -601,7 +601,7 @@ the [`Params` Scala docs](api/scala/index.html#org.apache.spark.ml.param.Params) .select(features, label, myProbability, prediction) .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) = -println(s($features, $label) - prob=$prob, prediction=$prediction) +println(s($features, $label) - prob=$prob, prediction=$prediction) } Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala" in the Spark repo. @@ -612,7 +612,7 @@ Refer to the [`Estimator` Java docs](api/java/org/apache/spark/ml/Estimator.html the [`Transformer` Java docs](api/java/org/apache/spark/ml/Transformer.html) and the [`Params` Java docs](api/java/org/apache/spark/ml/param/Params.html) for details on the API. -import java.util.Arrays; +import java.util.Arrays; import java.util.List; import
[01/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
Repository: spark-website Updated Branches: refs/heads/asf-site ecf94f284 -> d2bcf1854 http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/submitting-applications.html -- diff --git a/site/docs/2.1.0/submitting-applications.html b/site/docs/2.1.0/submitting-applications.html index fc18fa9..0c91739 100644 --- a/site/docs/2.1.0/submitting-applications.html +++ b/site/docs/2.1.0/submitting-applications.html @@ -151,14 +151,14 @@ packaging them into a .zip or .egg. This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports: -./bin/spark-submit \ +./bin/spark-submit \ --class main-class \ --master master-url \ --deploy-mode deploy-mode \ --conf key=value \ - ... # other options + ... # other options application-jar \ - [application-arguments] + [application-arguments] Some of the commonly used options are: @@ -194,23 +194,23 @@ you can also specify --supervise to make sure that the driver is au fails with non-zero exit code. To enumerate all such options available to spark-submit, run it with --help. Here are a few examples of common options: -# Run application locally on 8 cores +# Run application locally on 8 cores ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ - --master local[8] \ + --master local[8] \ /path/to/examples.jar \ - 100 + 100 -# Run on a Spark standalone cluster in client deploy mode +# Run on a Spark standalone cluster in client deploy mode ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ - 1000 + 1000 -# Run on a Spark standalone cluster in cluster deploy mode with supervise +# Run on a Spark standalone cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ @@ -219,26 +219,26 @@ run it with --help. Here are a few examples of common options: --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ - 1000 + 1000 -# Run on a YARN cluster -export HADOOP_CONF_DIR=XXX +# Run on a YARN cluster +export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ - --deploy-mode cluster \ # can be client for client mode + --deploy-mode cluster \ # can be client for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ - 1000 + 1000 -# Run a Python application on a Spark standalone cluster +# Run a Python application on a Spark standalone cluster ./bin/spark-submit \ --master spark://207.184.161.138:7077 \ examples/src/main/python/pi.py \ - 1000 + 1000 -# Run on a Mesos cluster in cluster deploy mode with supervise +# Run on a Mesos cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master mesos://207.184.161.138:7077 \ @@ -247,7 +247,7 @@ run it with --help. Here are a few examples of common options: --executor-memory 20G \ --total-executor-cores 100 \ http://path/to/examples.jar \ - 1000 + 1000 Master URLs http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/tuning.html -- diff --git a/site/docs/2.1.0/tuning.html b/site/docs/2.1.0/tuning.html index ca4ad9f..33a6316 100644 --- a/site/docs/2.1.0/tuning.html +++ b/site/docs/2.1.0/tuning.html @@ -129,23 +129,23 @@ - Data Serialization - Memory Tuning - Memory Management Overview - Determining Memory Consumption - Tuning Data Structures - Serialized RDD Storage - Garbage Collection Tuning + Data Serialization + Memory Tuning + Memory Management Overview + Determining Memory Consumption + Tuning Data Structures + Serialized RDD Storage + Garbage Collection Tuning - Other Considerations - Level of Parallelism - Memory Usage of Reduce Tasks - Broadcasting Large Variables - Data Locality + Other Considerations + Level of Parallelism + Memory Usage of Reduce Tasks + Broadcasting Large Variables + Data Locality - Summary + Summary Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked @@ -194,9 +194,9 @@ in the AllScalaRegistrar from the https://github.com/twitter/chill;>Twi To register your own custom classes with Kryo, use the registerKryoClasses method. -val conf = new SparkConf().setMaster(...).setAppName(...) +val conf = new SparkConf().setMaster(...).setAppName(...)
[25/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294 This version is built from the docs source code generated by applying https://github.com/apache/spark/pull/16294 to v2.1.0 (so, other changes in branch 2.1 will not affect the doc). Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/d2bcf185 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/d2bcf185 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/d2bcf185 Branch: refs/heads/asf-site Commit: d2bcf1854b0e0409495e2f1d3c6beaad923f6e6b Parents: ecf94f2 Author: Yin HuaiAuthored: Wed Dec 28 14:32:43 2016 -0800 Committer: Yin Huai Committed: Wed Dec 28 14:32:43 2016 -0800 -- site/docs/2.1.0/building-spark.html | 46 +- site/docs/2.1.0/building-with-maven.html| 14 +- site/docs/2.1.0/configuration.html | 52 +- site/docs/2.1.0/ec2-scripts.html| 174 site/docs/2.1.0/graphx-programming-guide.html | 198 ++--- site/docs/2.1.0/hadoop-provided.html| 14 +- .../img/structured-streaming-watermark.png | Bin 0 -> 252000 bytes site/docs/2.1.0/img/structured-streaming.pptx | Bin 1105413 -> 1113902 bytes site/docs/2.1.0/job-scheduling.html | 40 +- site/docs/2.1.0/ml-advanced.html| 10 +- .../2.1.0/ml-classification-regression.html | 838 +- site/docs/2.1.0/ml-clustering.html | 124 +-- site/docs/2.1.0/ml-collaborative-filtering.html | 56 +- site/docs/2.1.0/ml-features.html| 764 site/docs/2.1.0/ml-migration-guides.html| 16 +- site/docs/2.1.0/ml-pipeline.html| 178 ++-- site/docs/2.1.0/ml-tuning.html | 172 ++-- site/docs/2.1.0/mllib-clustering.html | 186 ++-- .../2.1.0/mllib-collaborative-filtering.html| 48 +- site/docs/2.1.0/mllib-data-types.html | 208 ++--- site/docs/2.1.0/mllib-decision-tree.html| 94 +- .../2.1.0/mllib-dimensionality-reduction.html | 28 +- site/docs/2.1.0/mllib-ensembles.html| 182 ++-- site/docs/2.1.0/mllib-evaluation-metrics.html | 302 +++ site/docs/2.1.0/mllib-feature-extraction.html | 122 +-- .../2.1.0/mllib-frequent-pattern-mining.html| 28 +- site/docs/2.1.0/mllib-isotonic-regression.html | 38 +- site/docs/2.1.0/mllib-linear-methods.html | 174 ++-- site/docs/2.1.0/mllib-naive-bayes.html | 24 +- site/docs/2.1.0/mllib-optimization.html | 50 +- site/docs/2.1.0/mllib-pmml-model-export.html| 35 +- site/docs/2.1.0/mllib-statistics.html | 180 ++-- site/docs/2.1.0/programming-guide.html | 302 +++ site/docs/2.1.0/quick-start.html| 166 ++-- site/docs/2.1.0/running-on-mesos.html | 52 +- site/docs/2.1.0/running-on-yarn.html| 27 +- site/docs/2.1.0/spark-standalone.html | 30 +- site/docs/2.1.0/sparkr.html | 145 ++-- site/docs/2.1.0/sql-programming-guide.html | 819 +- site/docs/2.1.0/storage-openstack-swift.html| 12 +- site/docs/2.1.0/streaming-custom-receivers.html | 26 +- .../2.1.0/streaming-kafka-0-10-integration.html | 52 +- .../docs/2.1.0/streaming-programming-guide.html | 416 - .../structured-streaming-kafka-integration.html | 44 +- .../structured-streaming-programming-guide.html | 864 --- site/docs/2.1.0/submitting-applications.html| 36 +- site/docs/2.1.0/tuning.html | 30 +- 47 files changed, 3926 insertions(+), 3490 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/building-spark.html -- diff --git a/site/docs/2.1.0/building-spark.html b/site/docs/2.1.0/building-spark.html index b3a720c..5c20245 100644 --- a/site/docs/2.1.0/building-spark.html +++ b/site/docs/2.1.0/building-spark.html @@ -127,33 +127,33 @@ - Building Apache Spark - Apache Maven - Setting up Mavens Memory Usage - build/mvn + Building Apache Spark + Apache Maven + Setting up Mavens Memory Usage + build/mvn - Building a Runnable Distribution - Specifying the Hadoop Version - Building With Hive and JDBC Support - Packaging without Hadoop Dependencies for YARN - Building with Mesos support - Building for Scala 2.10 - Building submodules individually - Continuous Compilation - Speeding up Compilation with Zinc - Building with SBT - Â Encrypted
[11/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-linear-methods.html -- diff --git a/site/docs/2.1.0/mllib-linear-methods.html b/site/docs/2.1.0/mllib-linear-methods.html index 46a1a25..428d778 100644 --- a/site/docs/2.1.0/mllib-linear-methods.html +++ b/site/docs/2.1.0/mllib-linear-methods.html @@ -307,23 +307,23 @@ - Mathematical formulation - Loss functions - Regularizers - Optimization + Mathematical formulation + Loss functions + Regularizers + Optimization - Classification - Linear Support Vector Machines (SVMs) - Logistic regression + Classification + Linear Support Vector Machines (SVMs) + Logistic regression - Regression - Linear least squares, Lasso, and ridge regression - Streaming linear regression + Regression + Linear least squares, Lasso, and ridge regression + Streaming linear regression - Implementation (developer) + Implementation (developer) \[ @@ -489,7 +489,7 @@ error. Refer to the SVMWithSGD Scala docs and SVMModel Scala docs for details on the API. -import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} +import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils @@ -534,14 +534,14 @@ this way as well. For example, the following code produces an L1 regularized variant of SVMs with regularization parameter set to 0.1, and runs the training algorithm for 200 iterations. -import org.apache.spark.mllib.optimization.L1Updater +import org.apache.spark.mllib.optimization.L1Updater val svmAlg = new SVMWithSGD() svmAlg.optimizer .setNumIterations(200) .setRegParam(0.1) .setUpdater(new L1Updater) -val modelL1 = svmAlg.run(training) +val modelL1 = svmAlg.run(training) @@ -554,7 +554,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the SVMWithSGD Java docs and SVMModel Java docs for details on the API. -import scala.Tuple2; +import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; @@ -591,7 +591,7 @@ that is equivalent to the provided example in Scala is given below: // Get evaluation metrics. BinaryClassificationMetrics metrics = - new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); double auROC = metrics.areaUnderROC(); System.out.println(Area under ROC = + auROC); @@ -610,14 +610,14 @@ this way as well. For example, the following code produces an L1 regularized variant of SVMs with regularization parameter set to 0.1, and runs the training algorithm for 200 iterations. -import org.apache.spark.mllib.optimization.L1Updater; +import org.apache.spark.mllib.optimization.L1Updater; -SVMWithSGD svmAlg = new SVMWithSGD(); +SVMWithSGD svmAlg = new SVMWithSGD(); svmAlg.optimizer() .setNumIterations(200) .setRegParam(0.1) - .setUpdater(new L1Updater()); -final SVMModel modelL1 = svmAlg.run(training.rdd()); + .setUpdater(new L1Updater()); +final SVMModel modelL1 = svmAlg.run(training.rdd()); In order to run the above application, follow the instructions provided in the Self-Contained @@ -632,28 +632,28 @@ and make predictions with the resulting model to compute the training error. Refer to the SVMWithSGD Python docs and SVMModel Python docs for more details on the API. -from pyspark.mllib.classification import SVMWithSGD, SVMModel +from pyspark.mllib.classification import SVMWithSGD, SVMModel from pyspark.mllib.regression import LabeledPoint -# Load and parse the data +# Load and parse the data def parsePoint(line): -values = [float(x) for x in line.split( )] +values = [float(x) for x in line.split( )] return LabeledPoint(values[0], values[1:]) -data = sc.textFile(data/mllib/sample_svm_data.txt) +data = sc.textFile(data/mllib/sample_svm_data.txt) parsedData = data.map(parsePoint) -# Build the model +# Build the model model = SVMWithSGD.train(parsedData, iterations=100) -# Evaluating the model on training data +# Evaluating the model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) -print(Training Error = + str(trainErr)) +print(Training Error = + str(trainErr)) -# Save and load model -model.save(sc, target/tmp/pythonSVMWithSGDModel) -sameModel = SVMModel.load(sc, target/tmp/pythonSVMWithSGDModel) +# Save and load model +model.save(sc, target/tmp/pythonSVMWithSGDModel) +sameModel =
[23/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/hadoop-provided.html -- diff --git a/site/docs/2.1.0/hadoop-provided.html b/site/docs/2.1.0/hadoop-provided.html index ff7afb7..9d77cf0 100644 --- a/site/docs/2.1.0/hadoop-provided.html +++ b/site/docs/2.1.0/hadoop-provided.html @@ -133,16 +133,16 @@ Apache Hadoop For Apache distributions, you can use Hadoops classpath command. For instance: -### in conf/spark-env.sh ### +### in conf/spark-env.sh ### -# If hadoop binary is on your PATH -export SPARK_DIST_CLASSPATH=$(hadoop classpath) +# If hadoop binary is on your PATH +export SPARK_DIST_CLASSPATH=$(hadoop classpath) -# With explicit path to hadoop binary -export SPARK_DIST_CLASSPATH=$(/path/to/hadoop/bin/hadoop classpath) +# With explicit path to hadoop binary +export SPARK_DIST_CLASSPATH=$(/path/to/hadoop/bin/hadoop classpath) -# Passing a Hadoop configuration directory -export SPARK_DIST_CLASSPATH=$(hadoop --config /path/to/configs classpath) +# Passing a Hadoop configuration directory +export SPARK_DIST_CLASSPATH=$(hadoop --config /path/to/configs classpath) http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/img/structured-streaming-watermark.png -- diff --git a/site/docs/2.1.0/img/structured-streaming-watermark.png b/site/docs/2.1.0/img/structured-streaming-watermark.png new file mode 100644 index 000..f21fbda Binary files /dev/null and b/site/docs/2.1.0/img/structured-streaming-watermark.png differ http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/img/structured-streaming.pptx -- diff --git a/site/docs/2.1.0/img/structured-streaming.pptx b/site/docs/2.1.0/img/structured-streaming.pptx index 6aad2ed..f5bdfc0 100644 Binary files a/site/docs/2.1.0/img/structured-streaming.pptx and b/site/docs/2.1.0/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/job-scheduling.html -- diff --git a/site/docs/2.1.0/job-scheduling.html b/site/docs/2.1.0/job-scheduling.html index 53161c2..9651607 100644 --- a/site/docs/2.1.0/job-scheduling.html +++ b/site/docs/2.1.0/job-scheduling.html @@ -127,24 +127,24 @@ - Overview - Scheduling Across Applications - Dynamic Resource Allocation - Configuration and Setup - Resource Allocation Policy - Request Policy - Remove Policy + Overview + Scheduling Across Applications + Dynamic Resource Allocation + Configuration and Setup + Resource Allocation Policy + Request Policy + Remove Policy - Graceful Decommission of Executors + Graceful Decommission of Executors - Scheduling Within an Application - Fair Scheduler Pools - Default Behavior of Pools - Configuring Pool Properties + Scheduling Within an Application + Fair Scheduler Pools + Default Behavior of Pools + Configuring Pool Properties @@ -321,9 +321,9 @@ mode is best for multi-user settings. To enable the fair scheduler, simply set the spark.scheduler.mode property to FAIR when configuring a SparkContext: -val conf = new SparkConf().setMaster(...).setAppName(...) +val conf = new SparkConf().setMaster(...).setAppName(...) conf.set(spark.scheduler.mode, FAIR) -val sc = new SparkContext(conf) +val sc = new SparkContext(conf) Fair Scheduler Pools @@ -337,15 +337,15 @@ many concurrent jobs they have instead of giving jobs equal shares. Thi adding the spark.scheduler.pool local property to the SparkContext in the thread thats submitting them. This is done as follows: -// Assuming sc is your SparkContext variable -sc.setLocalProperty(spark.scheduler.pool, pool1) +// Assuming sc is your SparkContext variable +sc.setLocalProperty(spark.scheduler.pool, pool1) After setting this local property, all jobs submitted within this thread (by calls in this thread to RDD.save, count, collect, etc) will use this pool name. The setting is per-thread to make it easy to have a thread run multiple jobs on behalf of the same user. If youd like to clear the pool that a thread is associated with, simply call: -sc.setLocalProperty(spark.scheduler.pool, null) +sc.setLocalProperty(spark.scheduler.pool, null) Default Behavior of Pools @@ -379,12 +379,12 @@ of the cluster. By default, each pools minShare is 0. and setting a spark.scheduler.allocation.file property in your SparkConf. -conf.set(spark.scheduler.allocation.file,
[12/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-feature-extraction.html -- diff --git a/site/docs/2.1.0/mllib-feature-extraction.html b/site/docs/2.1.0/mllib-feature-extraction.html index 4726b37..f8cd98e 100644 --- a/site/docs/2.1.0/mllib-feature-extraction.html +++ b/site/docs/2.1.0/mllib-feature-extraction.html @@ -307,32 +307,32 @@ - TF-IDF - Word2Vec - Model - Example + TF-IDF + Word2Vec + Model + Example - StandardScaler - Model Fitting - Example + StandardScaler + Model Fitting + Example - Normalizer - Example + Normalizer + Example - ChiSqSelector - Model Fitting - Example + ChiSqSelector + Model Fitting + Example - ElementwiseProduct - Example + ElementwiseProduct + Example - PCA - Example + PCA + Example @@ -390,7 +390,7 @@ Each record could be an iterable of strings or other types. Refer to the HashingTF Scala docs for details on the API. -import org.apache.spark.mllib.feature.{HashingTF, IDF} +import org.apache.spark.mllib.feature.{HashingTF, IDF} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD @@ -424,24 +424,24 @@ Each record could be an iterable of strings or other types. Refer to the HashingTF Python docs for details on the API. -from pyspark.mllib.feature import HashingTF, IDF +from pyspark.mllib.feature import HashingTF, IDF -# Load documents (one per line). -documents = sc.textFile(data/mllib/kmeans_data.txt).map(lambda line: line.split( )) +# Load documents (one per line). +documents = sc.textFile(data/mllib/kmeans_data.txt).map(lambda line: line.split( )) hashingTF = HashingTF() tf = hashingTF.transform(documents) -# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes: -# First to compute the IDF vector and second to scale the term frequencies by IDF. +# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes: +# First to compute the IDF vector and second to scale the term frequencies by IDF. tf.cache() idf = IDF().fit(tf) tfidf = idf.transform(tf) -# spark.mllibs IDF implementation provides an option for ignoring terms -# which occur in less than a minimum number of documents. -# In such cases, the IDF for these terms is set to 0. -# This feature can be used by passing the minDocFreq value to the IDF constructor. +# spark.mllibs IDF implementation provides an option for ignoring terms +# which occur in less than a minimum number of documents. +# In such cases, the IDF for these terms is set to 0. +# This feature can be used by passing the minDocFreq value to the IDF constructor. idfIgnore = IDF(minDocFreq=2).fit(tf) tfidfIgnore = idfIgnore.transform(tf) @@ -467,7 +467,7 @@ skip-gram model is to maximize the average log-likelihood \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \] -where $k$ is the size of the training window. +where $k$ is the size of the training window. In the skip-gram model, every word $w$ is associated with two vectors $u_w$ and $v_w$ which are vector representations of $w$ as word and context respectively. The probability of correctly @@ -475,7 +475,7 @@ predicting word $w_i$ given word $w_j$ is determined by the softmax model, which \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \] -where $V$ is the vocabulary size. +where $V$ is the vocabulary size. The skip-gram model with softmax is expensive because the cost of computing $\log p(w_i | w_j)$ is proportional to $V$, which can be easily in order of millions. To speed up training of Word2Vec, @@ -488,13 +488,13 @@ $O(\log(V))$ construct a Word2Vec instance and then fit a Word2VecModel with the input data. Finally, we display the top 40 synonyms of the specified word. To run the example, first download the http://mattmahoney.net/dc/text8.zip;>text8 data and extract it to your preferred directory. -Here we assume the extracted file is text8 and in same directory as you run the spark shell. +Here we assume the extracted file is text8 and in same directory as you run the spark shell. Refer to the Word2Vec Scala docs for details on the API. -import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} +import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} val input = sc.textFile(data/mllib/sample_lda_data.txt).map(line = line.split( ).toSeq) @@ -505,7 +505,7 @@ Here we assume the extracted file is text8 and in same directory as val synonyms = model.findSynonyms(1, 5) for((synonym, cosineSimilarity) - synonyms) { -
[17/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-clustering.html -- diff --git a/site/docs/2.1.0/mllib-clustering.html b/site/docs/2.1.0/mllib-clustering.html index 9667606..1b50dab 100644 --- a/site/docs/2.1.0/mllib-clustering.html +++ b/site/docs/2.1.0/mllib-clustering.html @@ -366,12 +366,12 @@ models are trained for each cluster). The spark.mllib package supports the following models: - K-means - Gaussian mixture - Power iteration clustering (PIC) - Latent Dirichlet allocation (LDA) - Bisecting k-means - Streaming k-means + K-means + Gaussian mixture + Power iteration clustering (PIC) + Latent Dirichlet allocation (LDA) + Bisecting k-means + Streaming k-means K-means @@ -408,7 +408,7 @@ optimal k is usually one where there is an elbow in the W Refer to the KMeans Scala docs and KMeansModel Scala docs for details on the API. -import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} +import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors // Load and parse the data @@ -440,7 +440,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the KMeans Java docs and KMeansModel Java docs for details on the API. -import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; @@ -470,7 +470,7 @@ that is equivalent to the provided example in Scala is given below: KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); System.out.println(Cluster centers:); -for (Vector center: clusters.clusterCenters()) { +for (Vector center: clusters.clusterCenters()) { System.out.println( + center); } double cost = clusters.computeCost(parsedData.rdd()); @@ -498,29 +498,29 @@ fact the optimal k is usually one where there is an elbow Refer to the KMeans Python docs and KMeansModel Python docs for more details on the API. -from numpy import array +from numpy import array from math import sqrt from pyspark.mllib.clustering import KMeans, KMeansModel -# Load and parse the data -data = sc.textFile(data/mllib/kmeans_data.txt) -parsedData = data.map(lambda line: array([float(x) for x in line.split( )])) +# Load and parse the data +data = sc.textFile(data/mllib/kmeans_data.txt) +parsedData = data.map(lambda line: array([float(x) for x in line.split( )])) -# Build the model (cluster the data) -clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode=random) +# Build the model (cluster the data) +clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode=random) -# Evaluate clustering by computing Within Set Sum of Squared Errors +# Evaluate clustering by computing Within Set Sum of Squared Errors def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) -print(Within Set Sum of Squared Error = + str(WSSSE)) +print(Within Set Sum of Squared Error = + str(WSSSE)) -# Save and load model -clusters.save(sc, target/org/apache/spark/PythonKMeansExample/KMeansModel) -sameModel = KMeansModel.load(sc, target/org/apache/spark/PythonKMeansExample/KMeansModel) +# Save and load model +clusters.save(sc, target/org/apache/spark/PythonKMeansExample/KMeansModel) +sameModel = KMeansModel.load(sc, target/org/apache/spark/PythonKMeansExample/KMeansModel) Find full example code at "examples/src/main/python/mllib/k_means_example.py" in the Spark repo. @@ -554,7 +554,7 @@ to the algorithm. We then output the parameters of the mixture model. Refer to the GaussianMixture Scala docs and GaussianMixtureModel Scala docs for details on the API. -import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel} +import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel} import org.apache.spark.mllib.linalg.Vectors // Load and parse the data @@ -587,7 +587,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the GaussianMixture Java docs and GaussianMixtureModel Java docs for details on the API. -import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.GaussianMixture; import org.apache.spark.mllib.clustering.GaussianMixtureModel; @@ -612,7 +612,7 @@ that is equivalent to the provided example in Scala is given below: parsedData.cache(); // Cluster the data into two classes using GaussianMixture -GaussianMixtureModel
[13/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-evaluation-metrics.html -- diff --git a/site/docs/2.1.0/mllib-evaluation-metrics.html b/site/docs/2.1.0/mllib-evaluation-metrics.html index 4bc636d..0d5bb3b 100644 --- a/site/docs/2.1.0/mllib-evaluation-metrics.html +++ b/site/docs/2.1.0/mllib-evaluation-metrics.html @@ -307,20 +307,20 @@ - Classification model evaluation - Binary classification - Threshold tuning + Classification model evaluation + Binary classification + Threshold tuning - Multiclass classification - Label based metrics + Multiclass classification + Label based metrics - Multilabel classification - Ranking systems + Multilabel classification + Ranking systems - Regression model evaluation + Regression model evaluation spark.mllib comes with a number of machine learning algorithms that can be used to learn from and make predictions @@ -421,7 +421,7 @@ data, and evaluate the performance of the algorithm by several binary evaluation Refer to the LogisticRegressionWithLBFGS Scala docs and BinaryClassificationMetrics Scala docs for details on the API. -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS +import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils @@ -453,13 +453,13 @@ data, and evaluate the performance of the algorithm by several binary evaluation // Precision by threshold val precision = metrics.precisionByThreshold precision.foreach { case (t, p) = - println(sThreshold: $t, Precision: $p) + println(sThreshold: $t, Precision: $p) } // Recall by threshold val recall = metrics.recallByThreshold recall.foreach { case (t, r) = - println(sThreshold: $t, Recall: $r) + println(sThreshold: $t, Recall: $r) } // Precision-Recall Curve @@ -468,13 +468,13 @@ data, and evaluate the performance of the algorithm by several binary evaluation // F-measure val f1Score = metrics.fMeasureByThreshold f1Score.foreach { case (t, f) = - println(sThreshold: $t, F-score: $f, Beta = 1) + println(sThreshold: $t, F-score: $f, Beta = 1) } val beta = 0.5 val fScore = metrics.fMeasureByThreshold(beta) f1Score.foreach { case (t, f) = - println(sThreshold: $t, F-score: $f, Beta = 0.5) + println(sThreshold: $t, F-score: $f, Beta = 0.5) } // AUPRC @@ -498,7 +498,7 @@ data, and evaluate the performance of the algorithm by several binary evaluation Refer to the LogisticRegressionModel Java docs and LogisticRegressionWithLBFGS Java docs for details on the API. -import scala.Tuple2; +import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; @@ -518,7 +518,7 @@ data, and evaluate the performance of the algorithm by several binary evaluation JavaRDDLabeledPoint test = splits[1]; // Run training algorithm to build the model. -final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() +final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(2) .run(training.rdd()); @@ -538,7 +538,7 @@ data, and evaluate the performance of the algorithm by several binary evaluation // Get evaluation metrics. BinaryClassificationMetrics metrics = - new BinaryClassificationMetrics(predictionAndLabels.rdd()); + new BinaryClassificationMetrics(predictionAndLabels.rdd()); // Precision by threshold JavaRDDTuple2Object, Object precision = metrics.precisionByThreshold().toJavaRDD(); @@ -564,7 +564,7 @@ data, and evaluate the performance of the algorithm by several binary evaluation new FunctionTuple2Object, Object, Double() { @Override public Double call(Tuple2Object, Object t) { - return new Double(t._1().toString()); + return new Double(t._1().toString()); } } ); @@ -590,34 +590,34 @@ data, and evaluate the performance of the algorithm by several binary evaluation Refer to the BinaryClassificationMetrics Python docs and LogisticRegressionWithLBFGS Python docs for more details on the API. -from pyspark.mllib.classification import LogisticRegressionWithLBFGS +from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.mllib.regression import LabeledPoint -# Several of the methods available in scala are currently missing from pyspark -# Load training data in LIBSVM format +# Several of the methods available in scala are currently missing from pyspark +#
[08/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/quick-start.html -- diff --git a/site/docs/2.1.0/quick-start.html b/site/docs/2.1.0/quick-start.html index 76e67e1..9d5fad7 100644 --- a/site/docs/2.1.0/quick-start.html +++ b/site/docs/2.1.0/quick-start.html @@ -129,14 +129,14 @@ - Interactive Analysis with the Spark Shell - Basics - More on RDD Operations - Caching + Interactive Analysis with the Spark Shell + Basics + More on RDD Operations + Caching - Self-Contained Applications - Where to Go from Here + Self-Contained Applications + Where to Go from Here This tutorial provides a quick introduction to using Spark. We will first introduce the API through Sparks @@ -164,26 +164,26 @@ or Python. Start it by running the following in the Spark directory: Sparks primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Lets make a new RDD from the text of the README file in the Spark source directory: -scala val textFile = sc.textFile(README.md) -textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at console:25 +scala val textFile = sc.textFile(README.md) +textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at console:25 RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Lets start with a few actions: -scala textFile.count() // Number of items in this RDD +scala textFile.count() // Number of items in this RDD res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala textFile.first() // First item in this RDD -res1: String = # Apache Spark +res1: String = # Apache Spark Now lets use a transformation. We will use the filter transformation to return a new RDD with a subset of the items in the file. -scala val linesWithSpark = textFile.filter(line = line.contains(Spark)) -linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at console:27 +scala val linesWithSpark = textFile.filter(line = line.contains(Spark)) +linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at console:27 We can chain together transformations and actions: -scala textFile.filter(line = line.contains(Spark)).count() // How many lines contain Spark? -res3: Long = 15 +scala textFile.filter(line = line.contains(Spark)).count() // How many lines contain Spark? +res3: Long = 15 @@ -193,24 +193,24 @@ or Python. Start it by running the following in the Spark directory: Sparks primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Lets make a new RDD from the text of the README file in the Spark source directory: - textFile = sc.textFile(README.md) + textFile = sc.textFile(README.md) RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Lets start with a few actions: - textFile.count() # Number of items in this RDD + textFile.count() # Number of items in this RDD 126 - textFile.first() # First item in this RDD -u# Apache Spark + textFile.first() # First item in this RDD +u# Apache Spark Now lets use a transformation. We will use the filter transformation to return a new RDD with a subset of the items in the file. - linesWithSpark = textFile.filter(lambda line: Spark in line) + linesWithSpark = textFile.filter(lambda line: Spark in line) We can chain together transformations and actions: - textFile.filter(lambda line: Spark in line).count() # How many lines contain Spark? -15 + textFile.filter(lambda line: Spark in line).count() # How many lines contain Spark? +15 @@ -221,38 +221,38 @@ or Python. Start it by running the following in the Spark directory: -scala textFile.map(line = line.split( ).size).reduce((a, b) = if (a b) a else b) -res4: Long = 15 +scala textFile.map(line = line.split( ).size).reduce((a, b) = if (a b) a else b) +res4: Long = 15 This first maps a line to an integer value, creating a new RDD. reduce is called on that RDD to find the largest line count. The arguments to map and reduce are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. Well use Math.max() function to make this code easier to understand: -scala import java.lang.Math +scala
[14/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-decision-tree.html -- diff --git a/site/docs/2.1.0/mllib-decision-tree.html b/site/docs/2.1.0/mllib-decision-tree.html index 1a3d865..991610e 100644 --- a/site/docs/2.1.0/mllib-decision-tree.html +++ b/site/docs/2.1.0/mllib-decision-tree.html @@ -307,23 +307,23 @@ - Basic algorithm - Node impurity and information gain - Split candidates - Stopping rule + Basic algorithm + Node impurity and information gain + Split candidates + Stopping rule - Usage tips - Problem specification parameters - Stopping criteria - Tunable parameters - Caching and checkpointing + Usage tips + Problem specification parameters + Stopping criteria + Tunable parameters + Caching and checkpointing - Scaling - Examples - Classification - Regression + Scaling + Examples + Classification + Regression @@ -548,7 +548,7 @@ maximum tree depth of 5. The test error is calculated to measure the algorithm a Refer to the DecisionTree Scala docs and DecisionTreeModel Scala docs for details on the API. -import org.apache.spark.mllib.tree.DecisionTree +import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.util.MLUtils @@ -588,7 +588,7 @@ maximum tree depth of 5. The test error is calculated to measure the algorithm a Refer to the DecisionTree Java docs and DecisionTreeModel Java docs for details on the API. -import java.util.HashMap; +import java.util.HashMap; import java.util.Map; import scala.Tuple2; @@ -604,8 +604,8 @@ maximum tree depth of 5. The test error is calculated to measure the algorithm a import org.apache.spark.mllib.tree.model.DecisionTreeModel; import org.apache.spark.mllib.util.MLUtils; -SparkConf sparkConf = new SparkConf().setAppName(JavaDecisionTreeClassificationExample); -JavaSparkContext jsc = new JavaSparkContext(sparkConf); +SparkConf sparkConf = new SparkConf().setAppName(JavaDecisionTreeClassificationExample); +JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Load and parse the data file. String datapath = data/mllib/sample_libsvm_data.txt; @@ -657,30 +657,30 @@ maximum tree depth of 5. The test error is calculated to measure the algorithm a Refer to the DecisionTree Python docs and DecisionTreeModel Python docs for more details on the API. -from pyspark.mllib.tree import DecisionTree, DecisionTreeModel +from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from pyspark.mllib.util import MLUtils -# Load and parse the data file into an RDD of LabeledPoint. -data = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) -# Split the data into training and test sets (30% held out for testing) +# Load and parse the data file into an RDD of LabeledPoint. +data = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) +# Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) -# Train a DecisionTree model. -# Empty categoricalFeaturesInfo indicates all features are continuous. +# Train a DecisionTree model. +# Empty categoricalFeaturesInfo indicates all features are continuous. model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, - impurity=gini, maxDepth=5, maxBins=32) + impurity=gini, maxDepth=5, maxBins=32) -# Evaluate model on test instances and compute test error +# Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) -print(Test Error = + str(testErr)) -print(Learned classification tree model:) +print(Test Error = + str(testErr)) +print(Learned classification tree model:) print(model.toDebugString()) -# Save and load model -model.save(sc, target/tmp/myDecisionTreeClassificationModel) -sameModel = DecisionTreeModel.load(sc, target/tmp/myDecisionTreeClassificationModel) +# Save and load model +model.save(sc, target/tmp/myDecisionTreeClassificationModel) +sameModel = DecisionTreeModel.load(sc, target/tmp/myDecisionTreeClassificationModel) Find full example code at "examples/src/main/python/mllib/decision_tree_classification_example.py" in the Spark repo. @@ -701,7 +701,7 @@ depth of 5. The Mean Squared Error (MSE) is computed at the end to evaluate Refer to the DecisionTree Scala docs and DecisionTreeModel Scala
[20/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-features.html -- diff --git a/site/docs/2.1.0/ml-features.html b/site/docs/2.1.0/ml-features.html index 64463de..a2f102b 100644 --- a/site/docs/2.1.0/ml-features.html +++ b/site/docs/2.1.0/ml-features.html @@ -318,52 +318,52 @@ Table of Contents - Feature Extractors - TF-IDF - Word2Vec - CountVectorizer + Feature Extractors + TF-IDF + Word2Vec + CountVectorizer - Feature Transformers - Tokenizer - StopWordsRemover - $n$-gram - Binarizer - PCA - PolynomialExpansion - Discrete Cosine Transform (DCT) - StringIndexer - IndexToString - OneHotEncoder - VectorIndexer - Interaction - Normalizer - StandardScaler - MinMaxScaler - MaxAbsScaler - Bucketizer - ElementwiseProduct - SQLTransformer - VectorAssembler - QuantileDiscretizer + Feature Transformers + Tokenizer + StopWordsRemover + $n$-gram + Binarizer + PCA + PolynomialExpansion + Discrete Cosine Transform (DCT) + StringIndexer + IndexToString + OneHotEncoder + VectorIndexer + Interaction + Normalizer + StandardScaler + MinMaxScaler + MaxAbsScaler + Bucketizer + ElementwiseProduct + SQLTransformer + VectorAssembler + QuantileDiscretizer - Feature Selectors - VectorSlicer - RFormula - ChiSqSelector + Feature Selectors + VectorSlicer + RFormula + ChiSqSelector - Locality Sensitive Hashing - LSH Operations - Feature Transformation - Approximate Similarity Join - Approximate Nearest Neighbor Search + Locality Sensitive Hashing + LSH Operations + Feature Transformation + Approximate Similarity Join + Approximate Nearest Neighbor Search - LSH Algorithms - Bucketed Random Projection for Euclidean Distance - MinHash for Jaccard Distance + LSH Algorithms + Bucketed Random Projection for Euclidean Distance + MinHash for Jaccard Distance @@ -395,7 +395,7 @@ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). There are several variants on the definition of term frequency and document frequency. In MLlib, we separate TF and IDF to make them flexible. -TF: Both HashingTF and CountVectorizer can be used to generate the term frequency vectors. +TF: Both HashingTF and CountVectorizer can be used to generate the term frequency vectors. HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a set of terms might be a bag of words. @@ -437,7 +437,7 @@ when using text as features. Our feature vectors could then be passed to a lear Refer to the HashingTF Scala docs and the IDF Scala docs for more details on the API. -import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} +import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} val sentenceData = spark.createDataFrame(Seq( (0.0, Hi I heard about Spark), @@ -468,7 +468,7 @@ the IDF Scala doc Refer to the HashingTF Java docs and the IDF Java docs for more details on the API. -import java.util.Arrays; +import java.util.Arrays; import java.util.List; import org.apache.spark.ml.feature.HashingTF; @@ -489,17 +489,17 @@ the IDF Scala doc RowFactory.create(0.0, I wish Java could use case classes), RowFactory.create(1.0, Logistic regression models are neat) ); -StructType schema = new StructType(new StructField[]{ - new StructField(label, DataTypes.DoubleType, false, Metadata.empty()), - new StructField(sentence, DataTypes.StringType, false, Metadata.empty()) +StructType schema = new StructType(new StructField[]{ + new StructField(label, DataTypes.DoubleType, false, Metadata.empty()), + new StructField(sentence, DataTypes.StringType, false, Metadata.empty()) }); DatasetRow sentenceData = spark.createDataFrame(data, schema); -Tokenizer tokenizer = new Tokenizer().setInputCol(sentence).setOutputCol(words); +Tokenizer tokenizer = new Tokenizer().setInputCol(sentence).setOutputCol(words); DatasetRow wordsData = tokenizer.transform(sentenceData); int numFeatures = 20; -HashingTF hashingTF = new HashingTF() +HashingTF hashingTF = new HashingTF() .setInputCol(words) .setOutputCol(rawFeatures) .setNumFeatures(numFeatures); @@ -507,7 +507,7 @@ the IDF Scala doc DatasetRow featurizedData = hashingTF.transform(wordsData); // alternatively, CountVectorizer can also be used to get term frequency vectors -IDF idf = new IDF().setInputCol(rawFeatures).setOutputCol(features); +IDF
[22/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-classification-regression.html -- diff --git a/site/docs/2.1.0/ml-classification-regression.html b/site/docs/2.1.0/ml-classification-regression.html index 1e0665b..0b264bb 100644 --- a/site/docs/2.1.0/ml-classification-regression.html +++ b/site/docs/2.1.0/ml-classification-regression.html @@ -329,58 +329,58 @@ discussing specific classes of algorithms, such as linear methods, trees, and en Table of Contents - Classification - Logistic regression - Binomial logistic regression - Multinomial logistic regression + Classification + Logistic regression + Binomial logistic regression + Multinomial logistic regression - Decision tree classifier - Random forest classifier - Gradient-boosted tree classifier - Multilayer perceptron classifier - One-vs-Rest classifier (a.k.a. One-vs-All) - Naive Bayes + Decision tree classifier + Random forest classifier + Gradient-boosted tree classifier + Multilayer perceptron classifier + One-vs-Rest classifier (a.k.a. One-vs-All) + Naive Bayes - Regression - Linear regression - Generalized linear regression - Available families + Regression + Linear regression + Generalized linear regression + Available families - Decision tree regression - Random forest regression - Gradient-boosted tree regression - Survival regression - Isotonic regression - Examples + Decision tree regression + Random forest regression + Gradient-boosted tree regression + Survival regression + Isotonic regression + Examples - Linear methods - Decision trees - Inputs and Outputs - Input Columns - Output Columns + Linear methods + Decision trees + Inputs and Outputs + Input Columns + Output Columns - Tree Ensembles - Random Forests - Inputs and Outputs - Input Columns - Output Columns (Predictions) + Tree Ensembles + Random Forests + Inputs and Outputs + Input Columns + Output Columns (Predictions) - Gradient-Boosted Trees (GBTs) - Inputs and Outputs - Input Columns - Output Columns (Predictions) + Gradient-Boosted Trees (GBTs) + Inputs and Outputs + Input Columns + Output Columns (Predictions) @@ -407,7 +407,7 @@ parameter to select between these two algorithms, or leave it unset and Spark wi Binomial logistic regression -For more background and more details about the implementation of binomial logistic regression, refer to the documentation of logistic regression in spark.mllib. +For more background and more details about the implementation of binomial logistic regression, refer to the documentation of logistic regression in spark.mllib. Example @@ -421,7 +421,7 @@ $\alpha$ and regParam corresponds to $\lambda$. More details on parameters can be found in the Scala API documentation. -import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.classification.LogisticRegression // Load training data val training = spark.read.format(libsvm).load(data/mllib/sample_libsvm_data.txt) @@ -435,7 +435,7 @@ $\alpha$ and regParam corresponds to $\lambda$. val lrModel = lr.fit(training) // Print the coefficients and intercept for logistic regression -println(sCoefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}) +println(sCoefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}) // We can also use the multinomial family for binary classification val mlr = new LogisticRegression() @@ -447,8 +447,8 @@ $\alpha$ and regParam corresponds to $\lambda$. val mlrModel = mlr.fit(training) // Print the coefficients and intercepts for logistic regression with multinomial family -println(sMultinomial coefficients: ${mlrModel.coefficientMatrix}) -println(sMultinomial intercepts: ${mlrModel.interceptVector}) +println(sMultinomial coefficients: ${mlrModel.coefficientMatrix}) +println(sMultinomial intercepts: ${mlrModel.interceptVector}) Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala" in the Spark repo. @@ -457,7 +457,7 @@ $\alpha$ and regParam corresponds to $\lambda$.
[05/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/storage-openstack-swift.html -- diff --git a/site/docs/2.1.0/storage-openstack-swift.html b/site/docs/2.1.0/storage-openstack-swift.html index bbb3446..a20c67f 100644 --- a/site/docs/2.1.0/storage-openstack-swift.html +++ b/site/docs/2.1.0/storage-openstack-swift.html @@ -144,7 +144,7 @@ Current Swift driver requires Swift to use Keystone authentication method. The Spark application should include hadoop-openstack dependency. For example, for Maven support, add the following to the pom.xml file: -dependencyManagement +dependencyManagement ... dependency groupIdorg.apache.hadoop/groupId @@ -152,15 +152,15 @@ For example, for Maven support, add the following to the pom.xml fi version2.3.0/version /dependency ... -/dependencyManagement +/dependencyManagement Configuration Parameters Create core-site.xml and place it inside Sparks conf directory. There are two main categories of parameters that should to be configured: declaration of the -Swift driver and the parameters that are required by Keystone. +Swift driver and the parameters that are required by Keystone. -Configuration of Hadoop to use Swift File system achieved via +Configuration of Hadoop to use Swift File system achieved via Property NameValue @@ -221,7 +221,7 @@ contains a list of Keystone mandatory parameters. PROVIDER can be a For example, assume PROVIDER=SparkTest and Keystone contains user tester with password testing defined for tenant test. Then core-site.xml should include: -configuration +configuration property namefs.swift.impl/name valueorg.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem/value @@ -257,7 +257,7 @@ defined for tenant test. Then core-site.xml should inc namefs.swift.service.SparkTest.password/name valuetesting/value /property -/configuration +/configuration Notice that fs.swift.service.PROVIDER.tenant, http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-custom-receivers.html -- diff --git a/site/docs/2.1.0/streaming-custom-receivers.html b/site/docs/2.1.0/streaming-custom-receivers.html index d31647d..846c797 100644 --- a/site/docs/2.1.0/streaming-custom-receivers.html +++ b/site/docs/2.1.0/streaming-custom-receivers.html @@ -171,7 +171,7 @@ has any error connecting or receiving, the receiver is restarted to make another -class CustomReceiver(host: String, port: Int) +class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { @@ -216,12 +216,12 @@ has any error connecting or receiving, the receiver is restarted to make another restart(Error receiving data, t) } } -} +} -public class JavaCustomReceiver extends ReceiverString { +public class JavaCustomReceiver extends ReceiverString { String host = null; int port = -1; @@ -234,7 +234,7 @@ has any error connecting or receiving, the receiver is restarted to make another public void onStart() { // Start the thread that receives data over a connection -new Thread() { +new Thread() { @Override public void run() { receive(); } @@ -253,10 +253,10 @@ has any error connecting or receiving, the receiver is restarted to make another try { // connect to the server - socket = new Socket(host, port); + socket = new Socket(host, port); - BufferedReader reader = new BufferedReader( -new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); + BufferedReader reader = new BufferedReader( +new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading while (!isStopped() (userInput = reader.readLine()) != null) { @@ -276,7 +276,7 @@ has any error connecting or receiving, the receiver is restarted to make another restart(Error receiving data, t); } } -} +} @@ -290,20 +290,20 @@ an input DStream using data received by the instance of custom receiver, as show -// Assuming ssc is the StreamingContext +// Assuming ssc is the StreamingContext val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = lines.flatMap(_.split( )) -... +... The full source code is in the example https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala;>CustomReceiver.scala. -// Assuming ssc is the JavaStreamingContext -JavaDStreamString customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port)); +// Assuming ssc is the JavaStreamingContext
[21/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-clustering.html -- diff --git a/site/docs/2.1.0/ml-clustering.html b/site/docs/2.1.0/ml-clustering.html index e225281..df38605 100644 --- a/site/docs/2.1.0/ml-clustering.html +++ b/site/docs/2.1.0/ml-clustering.html @@ -313,21 +313,21 @@ about these algorithms. Table of Contents - K-means - Input Columns - Output Columns - Example + K-means + Input Columns + Output Columns + Example - Latent Dirichlet allocation (LDA) - Bisecting k-means - Example + Latent Dirichlet allocation (LDA) + Bisecting k-means + Example - Gaussian Mixture Model (GMM) - Input Columns - Output Columns - Example + Gaussian Mixture Model (GMM) + Input Columns + Output Columns + Example @@ -391,7 +391,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea Refer to the Scala API docs for more details. -import org.apache.spark.ml.clustering.KMeans +import org.apache.spark.ml.clustering.KMeans // Loads data. val dataset = spark.read.format(libsvm).load(data/mllib/sample_kmeans_data.txt) @@ -402,7 +402,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea // Evaluate clustering by computing Within Set Sum of Squared Errors. val WSSSE = model.computeCost(dataset) -println(sWithin Set Sum of Squared Errors = $WSSSE) +println(sWithin Set Sum of Squared Errors = $WSSSE) // Shows the result. println(Cluster Centers: ) @@ -414,7 +414,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea Refer to the Java API docs for more details. -import org.apache.spark.ml.clustering.KMeansModel; +import org.apache.spark.ml.clustering.KMeansModel; import org.apache.spark.ml.clustering.KMeans; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.sql.Dataset; @@ -424,7 +424,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea DatasetRow dataset = spark.read().format(libsvm).load(data/mllib/sample_kmeans_data.txt); // Trains a k-means model. -KMeans kmeans = new KMeans().setK(2).setSeed(1L); +KMeans kmeans = new KMeans().setK(2).setSeed(1L); KMeansModel model = kmeans.fit(dataset); // Evaluate clustering by computing Within Set Sum of Squared Errors. @@ -434,7 +434,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea // Shows the result. Vector[] centers = model.clusterCenters(); System.out.println(Cluster Centers: ); -for (Vector center: centers) { +for (Vector center: centers) { System.out.println(center); } @@ -444,22 +444,22 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea Refer to the Python API docs for more details. -from pyspark.ml.clustering import KMeans +from pyspark.ml.clustering import KMeans -# Loads data. -dataset = spark.read.format(libsvm).load(data/mllib/sample_kmeans_data.txt) +# Loads data. +dataset = spark.read.format(libsvm).load(data/mllib/sample_kmeans_data.txt) -# Trains a k-means model. +# Trains a k-means model. kmeans = KMeans().setK(2).setSeed(1) model = kmeans.fit(dataset) -# Evaluate clustering by computing Within Set Sum of Squared Errors. +# Evaluate clustering by computing Within Set Sum of Squared Errors. wssse = model.computeCost(dataset) -print(Within Set Sum of Squared Errors = + str(wssse)) +print(Within Set Sum of Squared Errors = + str(wssse)) -# Shows the result. +# Shows the result. centers = model.clusterCenters() -print(Cluster Centers: ) +print(Cluster Centers: ) for center in centers: print(center) @@ -470,7 +470,7 @@ called http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf;>kmea Refer to the R API docs for more details. -# Fit a k-means model with spark.kmeans +# Fit a k-means model with spark.kmeans irisDF - suppressWarnings(createDataFrame(iris)) kmeansDF - irisDF kmeansTestDF - irisDF @@ -504,7 +504,7 @@ and generates a LDAModel as the base model. Expert users may cast a Refer to the Scala API docs for more details. -import org.apache.spark.ml.clustering.LDA +import org.apache.spark.ml.clustering.LDA // Loads data. val dataset = spark.read.format(libsvm) @@ -516,8 +516,8 @@ and generates a LDAModel as the base model. Expert users may cast a val ll = model.logLikelihood(dataset) val lp = model.logPerplexity(dataset) -println(sThe lower bound on the log likelihood of the entire corpus: $ll) -println(sThe upper bound bound on perplexity: $lp) +println(sThe lower bound on the log likelihood of the entire corpus: $ll) +println(sThe upper bound bound on perplexity: $lp) // Describe topics. val topics = model.describeTopics(3) @@ -535,7 +535,7 @@ and generates a LDAModel as the base
[02/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/structured-streaming-programming-guide.html -- diff --git a/site/docs/2.1.0/structured-streaming-programming-guide.html b/site/docs/2.1.0/structured-streaming-programming-guide.html index e54c101..3a1ac5f 100644 --- a/site/docs/2.1.0/structured-streaming-programming-guide.html +++ b/site/docs/2.1.0/structured-streaming-programming-guide.html @@ -127,45 +127,50 @@ - Overview - Quick Example - Programming Model - Basic Concepts - Handling Event-time and Late Data - Fault Tolerance Semantics + Overview + Quick Example + Programming Model + Basic Concepts + Handling Event-time and Late Data + Fault Tolerance Semantics - API using Datasets and DataFrames - Creating streaming DataFrames and streaming Datasets - Data Sources - Schema inference and partition of streaming DataFrames/Datasets + API using Datasets and DataFrames + Creating streaming DataFrames and streaming Datasets + Data Sources + Schema inference and partition of streaming DataFrames/Datasets - Operations on streaming DataFrames/Datasets - Basic Operations - Selection, Projection, Aggregation - Window Operations on Event Time - Join Operations - Unsupported Operations + Operations on streaming DataFrames/Datasets + Basic Operations - Selection, Projection, Aggregation + Window Operations on Event Time + Handling Late Data and Watermarking + Join Operations + Unsupported Operations - Starting Streaming Queries - Output Modes - Output Sinks - Using Foreach + Starting Streaming Queries + Output Modes + Output Sinks + Using Foreach - Managing Streaming Queries - Monitoring Streaming Queries - Recovering from Failures with Checkpointing + Managing Streaming Queries + Monitoring Streaming Queries + Interactive APIs + Asynchronous API + + + Recovering from Failures with Checkpointing - Where to go from here + Where to go from here Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming. -Spark 2.0 is the ALPHA RELEASE of Structured Streaming and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, lets start with a simple example - a streaming word count. +Structured Streaming is still ALPHA in Spark 2.1 and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, lets start with a simple example - a streaming word count. Quick Example Letâs say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Letâs see how you can express this using Structured Streaming. You can see the full code in @@ -175,7 +180,7 @@ And if you http://spark.apache.org/downloads.html;>download Spark, -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession @@ -183,12 +188,12 @@ And if you http://spark.apache.org/downloads.html;>download Spark, .appName(StructuredNetworkWordCount) .getOrCreate() -import spark.implicits._ +import spark.implicits._ -import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; @@ -198,19 +203,19 @@ And if you http://spark.apache.org/downloads.html;>download Spark, SparkSession spark = SparkSession .builder() .appName(JavaStructuredNetworkWordCount) - .getOrCreate();
[10/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-pmml-model-export.html -- diff --git a/site/docs/2.1.0/mllib-pmml-model-export.html b/site/docs/2.1.0/mllib-pmml-model-export.html index 30815e0..3f2fd91 100644 --- a/site/docs/2.1.0/mllib-pmml-model-export.html +++ b/site/docs/2.1.0/mllib-pmml-model-export.html @@ -307,8 +307,8 @@ - spark.mllib supported models - Examples + spark.mllib supported models + Examples spark.mllib supported models @@ -353,32 +353,31 @@ Refer to the KMeans Scala docs and Vectors Scala docs for details on the API. -Here a complete example of building a KMeansModel and print it out in PMML format: -import org.apache.spark.mllib.clustering.KMeans -import org.apache.spark.mllib.linalg.Vectors +Here a complete example of building a KMeansModel and print it out in PMML format: +div class="highlight"preimport org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.mllib.linalg.Vectors -// Load and parse the data +// Load and parse the data val data = sc.textFile(data/mllib/kmeans_data.txt) -val parsedData = data.map(s = Vectors.dense(s.split( ).map(_.toDouble))).cache() +val parsedData = data.map(s = Vectors.dense(s.split( ).map(_.toDouble))).cache() -// Cluster the data into two classes using KMeans +// Cluster the data into two classes using KMeans val numClusters = 2 val numIterations = 20 -val clusters = KMeans.train(parsedData, numClusters, numIterations) +val clusters = KMeans.train(parsedData, numClusters, numIterations) -// Export to PMML to a String in PMML format -println(PMML Model:\n + clusters.toPMML) +// Export to PMML to a String in PMML format +println(PMML Model:\n + clusters.toPMML) -// Export the model to a local file in PMML format -clusters.toPMML(/tmp/kmeans.xml) +// Export the model to a local file in PMML format +clusters.toPMML(/tmp/kmeans.xml) -// Export the model to a directory on a distributed file system in PMML format -clusters.toPMML(sc, /tmp/kmeans) +// Export the model to a directory on a distributed file system in PMML format +clusters.toPMML(sc, /tmp/kmeans) -// Export the model to the OutputStream in PMML format +// Export the model to the OutputStream in PMML format clusters.toPMML(System.out) - -Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/PMMLModelExportExample.scala" in the Spark repo. +/pre/divdivFind full example code at examples/src/main/scala/org/apache/spark/examples/mllib/PMMLModelExportExample.scala in the Spark repo./div For unsupported models, either you will not find a .toPMML method or an IllegalArgumentException will be thrown. http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-statistics.html -- diff --git a/site/docs/2.1.0/mllib-statistics.html b/site/docs/2.1.0/mllib-statistics.html index 4485ecf..f04924c 100644 --- a/site/docs/2.1.0/mllib-statistics.html +++ b/site/docs/2.1.0/mllib-statistics.html @@ -358,15 +358,15 @@ - Summary statistics - Correlations - Stratified sampling - Hypothesis testing - Streaming Significance Testing + Summary statistics + Correlations + Stratified sampling + Hypothesis testing + Streaming Significance Testing - Random data generation - Kernel density estimation + Random data generation + Kernel density estimation \[ @@ -401,7 +401,7 @@ total count. Refer to the MultivariateStatisticalSummary Scala docs for details on the API. -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} val observations = sc.parallelize( @@ -430,7 +430,7 @@ total count. Refer to the MultivariateStatisticalSummary Java docs for details on the API. -import java.util.Arrays; +import java.util.Arrays; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.linalg.Vector; @@ -463,19 +463,19 @@ total count. Refer to the MultivariateStatisticalSummary Python docs for more details on the API. -import numpy as np +import numpy as np from pyspark.mllib.stat import Statistics mat = sc.parallelize( [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])] -) # an RDD of Vectors +) # an RDD of Vectors -# Compute column summary statistics. +# Compute column summary statistics. summary = Statistics.colStats(mat) -print(summary.mean()) # a dense vector containing the mean value for each column -print(summary.variance()) # column-wise variance -print(summary.numNonzeros()) # number of nonzeros in
[09/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/programming-guide.html -- diff --git a/site/docs/2.1.0/programming-guide.html b/site/docs/2.1.0/programming-guide.html index 12458af..0e06e86 100644 --- a/site/docs/2.1.0/programming-guide.html +++ b/site/docs/2.1.0/programming-guide.html @@ -129,50 +129,50 @@ - Overview - Linking with Spark - Initializing Spark - Using the Shell + Overview + Linking with Spark + Initializing Spark + Using the Shell - Resilient Distributed Datasets (RDDs) - Parallelized Collections - External Datasets - RDD Operations - Basics - Passing Functions to Spark - Understanding closures - Example - Local vs. cluster modes - Printing elements of an RDD + Resilient Distributed Datasets (RDDs) + Parallelized Collections + External Datasets + RDD Operations + Basics + Passing Functions to Spark + Understanding closures + Example + Local vs. cluster modes + Printing elements of an RDD - Working with Key-Value Pairs - Transformations - Actions - Shuffle operations - Background - Performance Impact + Working with Key-Value Pairs + Transformations + Actions + Shuffle operations + Background + Performance Impact - RDD Persistence - Which Storage Level to Choose? - Removing Data + RDD Persistence + Which Storage Level to Choose? + Removing Data - Shared Variables - Broadcast Variables - Accumulators + Shared Variables + Broadcast Variables + Accumulators - Deploying to a Cluster - Launching Spark jobs from Java / Scala - Unit Testing - Where to Go from Here + Deploying to a Cluster + Launching Spark jobs from Java / Scala + Unit Testing + Where to Go from Here Overview @@ -212,8 +212,8 @@ version = your-hdfs-version Finally, you need to import some Spark classes into your program. Add the following lines: -import org.apache.spark.SparkContext -import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf (Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._ to enable essential implicit conversions.) @@ -245,9 +245,9 @@ version = your-hdfs-version Finally, you need to import some Spark classes into your program. Add the following lines: -import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD -import org.apache.spark.SparkConf +import org.apache.spark.SparkConf @@ -269,13 +269,13 @@ for common HDFS versions. Finally, you need to import some Spark classes into your program. Add the following line: -from pyspark import SparkContext, SparkConf +from pyspark import SparkContext, SparkConf PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH, you can specify which version of Python you want to use by PYSPARK_PYTHON, for example: -$ PYSPARK_PYTHON=python3.4 bin/pyspark -$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py +$ PYSPARK_PYTHON=python3.4 bin/pyspark +$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py @@ -293,8 +293,8 @@ that contains information about your application. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. -val conf = new SparkConf().setAppName(appName).setMaster(master) -new SparkContext(conf) +val conf = new SparkConf().setAppName(appName).setMaster(master) +new SparkContext(conf) @@ -304,8 +304,8 @@ that contains information about your application. how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application. -SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); -JavaSparkContext sc = new JavaSparkContext(conf); +SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); +JavaSparkContext sc = new JavaSparkContext(conf); @@ -315,8 +315,8 @@ that contains information about your application. how to access a cluster. To create a SparkContext you first need to build a SparkConf object that
[07/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/sparkr.html -- diff --git a/site/docs/2.1.0/sparkr.html b/site/docs/2.1.0/sparkr.html index 0a1a347..e861a01 100644 --- a/site/docs/2.1.0/sparkr.html +++ b/site/docs/2.1.0/sparkr.html @@ -127,53 +127,53 @@ - Overview - SparkDataFrame - Starting Up: SparkSession - Starting Up from RStudio - Creating SparkDataFrames - From local data frames - From Data Sources - From Hive tables + Overview + SparkDataFrame + Starting Up: SparkSession + Starting Up from RStudio + Creating SparkDataFrames + From local data frames + From Data Sources + From Hive tables - SparkDataFrame Operations - Selecting rows, columns - Grouping, Aggregation - Operating on Columns - Applying User-Defined Function - Run a given function on a large dataset using dapply or dapplyCollect - dapply - dapplyCollect + SparkDataFrame Operations + Selecting rows, columns + Grouping, Aggregation + Operating on Columns + Applying User-Defined Function + Run a given function on a large dataset using dapply or dapplyCollect + dapply + dapplyCollect - Run a given function on a large dataset grouping by input column(s) and using gapply or gapplyCollect - gapply - gapplyCollect + Run a given function on a large dataset grouping by input column(s) and using gapply or gapplyCollect + gapply + gapplyCollect - Data type mapping between R and Spark - Run local R functions distributed using spark.lapply - spark.lapply + Data type mapping between R and Spark + Run local R functions distributed using spark.lapply + spark.lapply - Running SQL Queries from SparkR + Running SQL Queries from SparkR - Machine Learning - Algorithms - Model persistence + Machine Learning + Algorithms + Model persistence - R Function Name Conflicts - Migration Guide - Upgrading From SparkR 1.5.x to 1.6.x - Upgrading From SparkR 1.6.x to 2.0 - Upgrading to SparkR 2.1.0 + R Function Name Conflicts + Migration Guide + Upgrading From SparkR 1.5.x to 1.6.x + Upgrading From SparkR 1.6.x to 2.0 + Upgrading to SparkR 2.1.0 @@ -202,7 +202,7 @@ You can create a SparkSession using sparkR.session and -sparkR.session() +sparkR.session() @@ -223,11 +223,11 @@ them, pass them as you would other configuration properties in the sparkCo -if (nchar(Sys.getenv(SPARK_HOME)) 1) { +if (nchar(Sys.getenv(SPARK_HOME)) 1) { Sys.setenv(SPARK_HOME = /home/spark) } library(SparkR, lib.loc = c(file.path(Sys.getenv(SPARK_HOME), R, lib))) -sparkR.session(master = local[*], sparkConfig = list(spark.driver.memory = 2g)) +sparkR.session(master = local[*], sparkConfig = list(spark.driver.memory = 2g)) @@ -282,14 +282,14 @@ sparkR.session(master = - df - as.DataFrame(faithful) + df - as.DataFrame(faithful) # Displays the first part of the SparkDataFrame head(df) ## eruptions waiting ##1 3.600 79 ##2 1.800 54 -##3 3.333 74 +##3 3.333 74 @@ -303,7 +303,7 @@ specifying --packages with spark-submit or spark - sparkR.session(sparkPackages = com.databricks:spark-avro_2.11:3.0.0) + sparkR.session(sparkPackages = com.databricks:spark-avro_2.11:3.0.0) @@ -311,7 +311,7 @@ specifying --packages with spark-submit or spark - people - read.df(./examples/src/main/resources/people.json, json) + people - read.df(./examples/src/main/resources/people.json, json) head(people) ## agename ##1 NA Michael @@ -325,7 +325,7 @@ printSchema(people) # |-- name: string (nullable = true) # Similarly, multiple files can be read with read.json -people - read.json(c(./examples/src/main/resources/people.json, ./examples/src/main/resources/people2.json)) +people - read.json(c(./examples/src/main/resources/people.json, ./examples/src/main/resources/people2.json)) @@ -333,7 +333,7 @@ people - read.json( - df - read.df(csvPath, csv, header = true, inferSchema = true, na.strings = NA) + df - read.df(csvPath, csv, header
[18/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/ml-tuning.html -- diff --git a/site/docs/2.1.0/ml-tuning.html b/site/docs/2.1.0/ml-tuning.html index 0c36a98..2246cc2 100644 --- a/site/docs/2.1.0/ml-tuning.html +++ b/site/docs/2.1.0/ml-tuning.html @@ -329,13 +329,13 @@ Built-in Cross-Validation and other tooling allow users to optimize hyperparamet Table of contents - Model selection (a.k.a. hyperparameter tuning) - Cross-Validation - Example: model selection via cross-validation + Model selection (a.k.a. hyperparameter tuning) + Cross-Validation + Example: model selection via cross-validation - Train-Validation Split - Example: model selection via train validation split + Train-Validation Split + Example: model selection via train validation split @@ -396,7 +396,7 @@ However, it is also a well-established method for choosing parameters which is m Refer to the [`CrossValidator` Scala docs](api/scala/index.html#org.apache.spark.ml.tuning.CrossValidator) for details on the API. -import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} @@ -467,7 +467,7 @@ Refer to the [`CrossValidator` Scala docs](api/scala/index.html#org.apache.spark .select(id, text, probability, prediction) .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) = -println(s($id, $text) -- prob=$prob, prediction=$prediction) +println(s($id, $text) -- prob=$prob, prediction=$prediction) } Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala" in the Spark repo. @@ -476,7 +476,7 @@ Refer to the [`CrossValidator` Scala docs](api/scala/index.html#org.apache.spark Refer to the [`CrossValidator` Java docs](api/java/org/apache/spark/ml/tuning/CrossValidator.html) for details on the API. -import java.util.Arrays; +import java.util.Arrays; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineStage; @@ -493,38 +493,38 @@ Refer to the [`CrossValidator` Java docs](api/java/org/apache/spark/ml/tuning/Cr // Prepare training documents, which are labeled. DatasetRow training = spark.createDataFrame(Arrays.asList( - new JavaLabeledDocument(0L, a b c d e spark, 1.0), - new JavaLabeledDocument(1L, b d, 0.0), - new JavaLabeledDocument(2L,spark f g h, 1.0), - new JavaLabeledDocument(3L, hadoop mapreduce, 0.0), - new JavaLabeledDocument(4L, b spark who, 1.0), - new JavaLabeledDocument(5L, g d a y, 0.0), - new JavaLabeledDocument(6L, spark fly, 1.0), - new JavaLabeledDocument(7L, was mapreduce, 0.0), - new JavaLabeledDocument(8L, e spark program, 1.0), - new JavaLabeledDocument(9L, a e c l, 0.0), - new JavaLabeledDocument(10L, spark compile, 1.0), - new JavaLabeledDocument(11L, hadoop software, 0.0) + new JavaLabeledDocument(0L, a b c d e spark, 1.0), + new JavaLabeledDocument(1L, b d, 0.0), + new JavaLabeledDocument(2L,spark f g h, 1.0), + new JavaLabeledDocument(3L, hadoop mapreduce, 0.0), + new JavaLabeledDocument(4L, b spark who, 1.0), + new JavaLabeledDocument(5L, g d a y, 0.0), + new JavaLabeledDocument(6L, spark fly, 1.0), + new JavaLabeledDocument(7L, was mapreduce, 0.0), + new JavaLabeledDocument(8L, e spark program, 1.0), + new JavaLabeledDocument(9L, a e c l, 0.0), + new JavaLabeledDocument(10L, spark compile, 1.0), + new JavaLabeledDocument(11L, hadoop software, 0.0) ), JavaLabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. -Tokenizer tokenizer = new Tokenizer() +Tokenizer tokenizer = new Tokenizer() .setInputCol(text) .setOutputCol(words); -HashingTF hashingTF = new HashingTF() +HashingTF hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol()) .setOutputCol(features); -LogisticRegression lr = new LogisticRegression() +LogisticRegression lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01); -Pipeline pipeline = new Pipeline() +Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {tokenizer, hashingTF, lr}); // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. -ParamMap[] paramGrid = new ParamGridBuilder() +ParamMap[] paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures(), new int[] {10, 100, 1000}) .addGrid(lr.regParam(), new double[] {0.1, 0.01}) .build(); @@ -534,9 +534,9 @@ Refer to the [`CrossValidator`
[04/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-programming-guide.html -- diff --git a/site/docs/2.1.0/streaming-programming-guide.html b/site/docs/2.1.0/streaming-programming-guide.html index 9a87d23..b1ce1e1 100644 --- a/site/docs/2.1.0/streaming-programming-guide.html +++ b/site/docs/2.1.0/streaming-programming-guide.html @@ -129,32 +129,32 @@ - Overview - A Quick Example - Basic Concepts - Linking - Initializing StreamingContext - Discretized Streams (DStreams) - Input DStreams and Receivers - Transformations on DStreams - Output Operations on DStreams - DataFrame and SQL Operations - MLlib Operations - Caching / Persistence - Checkpointing - Accumulators, Broadcast Variables, and Checkpoints - Deploying Applications - Monitoring Applications + Overview + A Quick Example + Basic Concepts + Linking + Initializing StreamingContext + Discretized Streams (DStreams) + Input DStreams and Receivers + Transformations on DStreams + Output Operations on DStreams + DataFrame and SQL Operations + MLlib Operations + Caching / Persistence + Checkpointing + Accumulators, Broadcast Variables, and Checkpoints + Deploying Applications + Monitoring Applications - Performance Tuning - Reducing the Batch Processing Times - Setting the Right Batch Interval - Memory Tuning + Performance Tuning + Reducing the Batch Processing Times + Setting the Right Batch Interval + Memory Tuning - Fault-tolerance Semantics - Where to Go from Here + Fault-tolerance Semantics + Where to Go from Here Overview @@ -209,7 +209,7 @@ conversions from StreamingContext into our environment in order to add useful me other classes we need (like DStream). StreamingContext is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second. -import org.apache.spark._ +import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 @@ -217,33 +217,33 @@ main entry point for all streaming functionality. We create a local StreamingCon // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount) -val ssc = new StreamingContext(conf, Seconds(1)) +val ssc = new StreamingContext(conf, Seconds(1)) Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. ). -// Create a DStream that will connect to hostname:port, like localhost: -val lines = ssc.socketTextStream(localhost, ) +// Create a DStream that will connect to hostname:port, like localhost: +val lines = ssc.socketTextStream(localhost, ) This lines DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space characters into words. -// Split each line into words -val words = lines.flatMap(_.split( )) +// Split each line into words +val words = lines.flatMap(_.split( )) flatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words. -import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 +import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console -wordCounts.print() +wordCounts.print() The words DStream is further mapped (one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. @@ -253,8 +253,8 @@ Finally, wordCounts.print() will print a few of the counts generate will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call -ssc.start() // Start the computation -ssc.awaitTermination() // Wait for the computation to terminate +ssc.start() // Start the computation +ssc.awaitTermination() // Wait for the computation to terminate The
[15/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-data-types.html -- diff --git a/site/docs/2.1.0/mllib-data-types.html b/site/docs/2.1.0/mllib-data-types.html index 546d921..f7b5358 100644 --- a/site/docs/2.1.0/mllib-data-types.html +++ b/site/docs/2.1.0/mllib-data-types.html @@ -307,14 +307,14 @@ - Local vector - Labeled point - Local matrix - Distributed matrix - RowMatrix - IndexedRowMatrix - CoordinateMatrix - BlockMatrix + Local vector + Labeled point + Local matrix + Distributed matrix + RowMatrix + IndexedRowMatrix + CoordinateMatrix + BlockMatrix @@ -347,14 +347,14 @@ using the factory methods implemented in Refer to the Vector Scala docs and Vectors Scala docs for details on the API. -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. -val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) +val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) Note: Scala imports scala.collection.immutable.Vector by default, so you have to import @@ -373,13 +373,13 @@ using the factory methods implemented in Refer to the Vector Java docs and Vectors Java docs for details on the API. -import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; // Create a dense vector (1.0, 0.0, 3.0). Vector dv = Vectors.dense(1.0, 0.0, 3.0); // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. -Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}); +Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}); @@ -405,18 +405,18 @@ in Ve Refer to the Vectors Python docs for more details on the API. -import numpy as np +import numpy as np import scipy.sparse as sps from pyspark.mllib.linalg import Vectors -# Use a NumPy array as a dense vector. +# Use a NumPy array as a dense vector. dv1 = np.array([1.0, 0.0, 3.0]) -# Use a Python list as a dense vector. +# Use a Python list as a dense vector. dv2 = [1.0, 0.0, 3.0] -# Create a SparseVector. +# Create a SparseVector. sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) -# Use a single-column SciPy csc_matrix as a sparse vector. -sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1)) +# Use a single-column SciPy csc_matrix as a sparse vector. +sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1)) @@ -438,14 +438,14 @@ For multiclass classification, labels should be class indices starting from zero Refer to the LabeledPoint Scala docs for details on the API. -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // Create a labeled point with a positive label and a dense feature vector. val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // Create a labeled point with a negative label and a sparse feature vector. -val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) +val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) @@ -456,14 +456,14 @@ For multiclass classification, labels should be class indices starting from zero Refer to the LabeledPoint Java docs for details on the API. -import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; // Create a labeled point with a positive label and a dense feature vector. -LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)); +LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)); // Create a labeled point with a negative label and a sparse feature vector. -LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0})); +LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0})); @@ -474,14 +474,14 @@ For multiclass classification, labels should be class indices starting from zero Refer to the LabeledPoint Python docs for more details on the API. -from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.linalg
[03/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/structured-streaming-kafka-integration.html -- diff --git a/site/docs/2.1.0/structured-streaming-kafka-integration.html b/site/docs/2.1.0/structured-streaming-kafka-integration.html index 5ca9259..7d2254f 100644 --- a/site/docs/2.1.0/structured-streaming-kafka-integration.html +++ b/site/docs/2.1.0/structured-streaming-kafka-integration.html @@ -144,7 +144,7 @@ application. See the Deploying subsection below. -// Subscribe to 1 topic +// Subscribe to 1 topic val ds1 = spark .readStream .format(kafka) @@ -172,12 +172,12 @@ application. See the Deploying subsection below. .option(subscribePattern, topic.*) .load() ds3.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) - .as[(String, String)] + .as[(String, String)] -// Subscribe to 1 topic +// Subscribe to 1 topic DatasetRow ds1 = spark .readStream() .format(kafka) @@ -202,43 +202,43 @@ application. See the Deploying subsection below. .option(kafka.bootstrap.servers, host1:port1,host2:port2) .option(subscribePattern, topic.*) .load() -ds3.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) +ds3.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) -# Subscribe to 1 topic +# Subscribe to 1 topic ds1 = spark .readStream() - .format(kafka) - .option(kafka.bootstrap.servers, host1:port1,host2:port2) - .option(subscribe, topic1) + .format(kafka) + .option(kafka.bootstrap.servers, host1:port1,host2:port2) + .option(subscribe, topic1) .load() -ds1.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) +ds1.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) -# Subscribe to multiple topics +# Subscribe to multiple topics ds2 = spark .readStream - .format(kafka) - .option(kafka.bootstrap.servers, host1:port1,host2:port2) - .option(subscribe, topic1,topic2) + .format(kafka) + .option(kafka.bootstrap.servers, host1:port1,host2:port2) + .option(subscribe, topic1,topic2) .load() -ds2.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) +ds2.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) -# Subscribe to a pattern +# Subscribe to a pattern ds3 = spark .readStream() - .format(kafka) - .option(kafka.bootstrap.servers, host1:port1,host2:port2) - .option(subscribePattern, topic.*) + .format(kafka) + .option(kafka.bootstrap.servers, host1:port1,host2:port2) + .option(subscribePattern, topic.*) .load() -ds3.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) +ds3.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) -Each row in the source has the following schema: - +Each row in the source has the following schema: +table class="table" ColumnType key @@ -268,7 +268,7 @@ application. See the Deploying subsection below. timestampType int - +/table The following options must be set for the Kafka source. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[16/25] spark-website git commit: Update 2.1.0 docs to include https://github.com/apache/spark/pull/16294
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/mllib-collaborative-filtering.html -- diff --git a/site/docs/2.1.0/mllib-collaborative-filtering.html b/site/docs/2.1.0/mllib-collaborative-filtering.html index e453032..b3f9e08 100644 --- a/site/docs/2.1.0/mllib-collaborative-filtering.html +++ b/site/docs/2.1.0/mllib-collaborative-filtering.html @@ -322,13 +322,13 @@ - Collaborative filtering - Explicit vs. implicit feedback - Scaling of the regularization parameter + Collaborative filtering + Explicit vs. implicit feedback + Scaling of the regularization parameter - Examples - Tutorial + Examples + Tutorial Collaborative filtering @@ -393,7 +393,7 @@ recommendation model by measuring the Mean Squared Error of rating prediction.Refer to the ALS Scala docs for more details on the API. -import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.MatrixFactorizationModel import org.apache.spark.mllib.recommendation.Rating @@ -434,9 +434,9 @@ recommendation model by measuring the Mean Squared Error of rating prediction.If the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can use the trainImplicit method to get better results. -val alpha = 0.01 +val alpha = 0.01 val lambda = 0.01 -val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha) +val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha) @@ -449,7 +449,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the ALS Java docs for more details on the API. -import scala.Tuple2; +import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; @@ -458,8 +458,8 @@ that is equivalent to the provided example in Scala is given below: import org.apache.spark.mllib.recommendation.Rating; import org.apache.spark.SparkConf; -SparkConf conf = new SparkConf().setAppName(Java Collaborative Filtering Example); -JavaSparkContext jsc = new JavaSparkContext(conf); +SparkConf conf = new SparkConf().setAppName(Java Collaborative Filtering Example); +JavaSparkContext jsc = new JavaSparkContext(conf); // Load and parse the data String path = data/mllib/als/test.data; @@ -468,7 +468,7 @@ that is equivalent to the provided example in Scala is given below: new FunctionString, Rating() { public Rating call(String s) { String[] sarray = s.split(,); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), + return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2])); } } @@ -528,36 +528,36 @@ recommendation by measuring the Mean Squared Error of rating prediction. Refer to the ALS Python docs for more details on the API. -from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating +from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating -# Load and parse the data -data = sc.textFile(data/mllib/als/test.data) -ratings = data.map(lambda l: l.split(,))\ +# Load and parse the data +data = sc.textFile(data/mllib/als/test.data) +ratings = data.map(lambda l: l.split(,))\ .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) -# Build the recommendation model using Alternating Least Squares +# Build the recommendation model using Alternating Least Squares rank = 10 numIterations = 10 model = ALS.train(ratings, rank, numIterations) -# Evaluate the model on training data +# Evaluate the model on training data testdata = ratings.map(lambda p: (p[0], p[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() -print(Mean Squared Error = + str(MSE)) +print(Mean Squared Error = + str(MSE)) -# Save and load model -model.save(sc, target/tmp/myCollaborativeFilter) -sameModel = MatrixFactorizationModel.load(sc, target/tmp/myCollaborativeFilter) +# Save and load model +model.save(sc, target/tmp/myCollaborativeFilter) +sameModel = MatrixFactorizationModel.load(sc, target/tmp/myCollaborativeFilter) Find full example code at "examples/src/main/python/mllib/recommendation_example.py" in the Spark repo. If the rating matrix is derived from other source of information (i.e. it is inferred from other signals), you can use the trainImplicit method to get better results. -# Build the recommendation model using Alternating Least Squares based on implicit ratings -model = ALS.trainImplicit(ratings, rank,
spark git commit: [SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status
Repository: spark Updated Branches: refs/heads/branch-2.1 7197a7bc7 -> 80d583bd0 [SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status ## What changes were proposed in this pull request? - Extended the Window operation section with code snippet and explanation of watermarking - Extended the Output Mode section with a table showing the compatibility between query type and output mode - Rewrote the Monitoring section with updated jsons generated by StreamingQuery.progress/status - Updated API changes in the StreamingQueryListener example TODO - [x] Figure showing the watermarking ## How was this patch tested? N/A ## Screenshots ### Section: Windowed Aggregation with Event Time https://cloud.githubusercontent.com/assets/663212/21246197/0e02cb1a-c2dc-11e6-8816-0cd28d8201d7.png;> ![image](https://cloud.githubusercontent.com/assets/663212/21246241/45b0f87a-c2dc-11e6-9c29-d0a89e07bf8d.png) https://cloud.githubusercontent.com/assets/663212/21246202/1652cefa-c2dc-11e6-8c64-3c05977fb3fc.png;> ### Section: Output Modes ![image](https://cloud.githubusercontent.com/assets/663212/21246276/8ee44948-c2dc-11e6-9fa2-30502fcf9a55.png) ### Section: Monitoring ![image](https://cloud.githubusercontent.com/assets/663212/21246535/3c5baeb2-c2de-11e6-88cd-ca71db7c5cf9.png) ![image](https://cloud.githubusercontent.com/assets/663212/21246574/789492c2-c2de-11e6-8471-7bef884e1837.png) Author: Tathagata DasCloses #16294 from tdas/SPARK-18669. (cherry picked from commit 092c6725bf039bf33299b53791e1958c4ea3f6aa) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80d583bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80d583bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80d583bd Branch: refs/heads/branch-2.1 Commit: 80d583bd09de54890cddfcc0c6fd807d7200ea75 Parents: 7197a7b Author: Tathagata Das Authored: Wed Dec 28 12:11:25 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 28 12:11:49 2016 -0800 -- docs/img/structured-streaming-watermark.png| Bin 0 -> 252000 bytes docs/img/structured-streaming.pptx | Bin 1105413 -> 1113902 bytes docs/structured-streaming-programming-guide.md | 460 +++- 3 files changed, 353 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80d583bd/docs/img/structured-streaming-watermark.png -- diff --git a/docs/img/structured-streaming-watermark.png b/docs/img/structured-streaming-watermark.png new file mode 100644 index 000..f21fbda Binary files /dev/null and b/docs/img/structured-streaming-watermark.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/80d583bd/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index 6aad2ed..f5bdfc0 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/80d583bd/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 77b66b3..3b7d0c4 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide # Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* -**Spark 2.0 is the ALPHA RELEASE of Structured Streaming** and the APIs are still experimental. In this guide,
spark git commit: [SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status
Repository: spark Updated Branches: refs/heads/master 6a475ae46 -> 092c6725b [SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status ## What changes were proposed in this pull request? - Extended the Window operation section with code snippet and explanation of watermarking - Extended the Output Mode section with a table showing the compatibility between query type and output mode - Rewrote the Monitoring section with updated jsons generated by StreamingQuery.progress/status - Updated API changes in the StreamingQueryListener example TODO - [x] Figure showing the watermarking ## How was this patch tested? N/A ## Screenshots ### Section: Windowed Aggregation with Event Time https://cloud.githubusercontent.com/assets/663212/21246197/0e02cb1a-c2dc-11e6-8816-0cd28d8201d7.png;> ![image](https://cloud.githubusercontent.com/assets/663212/21246241/45b0f87a-c2dc-11e6-9c29-d0a89e07bf8d.png) https://cloud.githubusercontent.com/assets/663212/21246202/1652cefa-c2dc-11e6-8c64-3c05977fb3fc.png;> ### Section: Output Modes ![image](https://cloud.githubusercontent.com/assets/663212/21246276/8ee44948-c2dc-11e6-9fa2-30502fcf9a55.png) ### Section: Monitoring ![image](https://cloud.githubusercontent.com/assets/663212/21246535/3c5baeb2-c2de-11e6-88cd-ca71db7c5cf9.png) ![image](https://cloud.githubusercontent.com/assets/663212/21246574/789492c2-c2de-11e6-8471-7bef884e1837.png) Author: Tathagata DasCloses #16294 from tdas/SPARK-18669. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/092c6725 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/092c6725 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/092c6725 Branch: refs/heads/master Commit: 092c6725bf039bf33299b53791e1958c4ea3f6aa Parents: 6a475ae Author: Tathagata Das Authored: Wed Dec 28 12:11:25 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 28 12:11:25 2016 -0800 -- docs/img/structured-streaming-watermark.png| Bin 0 -> 252000 bytes docs/img/structured-streaming.pptx | Bin 1105413 -> 1113902 bytes docs/structured-streaming-programming-guide.md | 460 +++- 3 files changed, 353 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/img/structured-streaming-watermark.png -- diff --git a/docs/img/structured-streaming-watermark.png b/docs/img/structured-streaming-watermark.png new file mode 100644 index 000..f21fbda Binary files /dev/null and b/docs/img/structured-streaming-watermark.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index 6aad2ed..f5bdfc0 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 77b66b3..3b7d0c4 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide # Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* -**Spark 2.0 is the ALPHA RELEASE of Structured Streaming** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.0 [created] cd0a08361 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17772][ML][TEST] Add test functions for ML sample weights
Repository: spark Updated Branches: refs/heads/master d7bce3bd3 -> 6a475ae46 [SPARK-17772][ML][TEST] Add test functions for ML sample weights ## What changes were proposed in this pull request? More and more ML algos are accepting sample weights, and they have been tested rather heterogeneously and with code duplication. This patch adds extensible helper methods to `MLTestingUtils` that can be reused by various algorithms accepting sample weights. Up to now, there seems to be a few tests that have been implemented commonly: * Check that oversampling is the same as giving the instances sample weights proportional to the number of samples * Check that outliers with tiny sample weights do not affect the algorithm's performance This patch adds an additional test: * Check that algorithms are invariant to constant scaling of the sample weights. i.e. uniform sample weights with `w_i = 1.0` is effectively the same as uniform sample weights with `w_i = 1` or `w_i = 0.0001` The instances of these tests occurred in LinearRegression, NaiveBayes, and LogisticRegression. Those tests have been removed/modified to use the new helper methods. These helper functions will be of use when [SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478) is implemented. ## How was this patch tested? This patch only involves modifying test suites. ## Other notes Both IsotonicRegression and GeneralizedLinearRegression also extend `HasWeightCol`. I did not modify these test suites because it will make this patch easier to review, and because they did not duplicate the same tests as the three suites that were modified. If we want to change them later, we can create a JIRA for it now, but it's open for debate. Author: sethahCloses #15721 from sethah/SPARK-17772. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a475ae4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a475ae4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a475ae4 Branch: refs/heads/master Commit: 6a475ae466a7ce28d507244bf6db91be06ed81ef Parents: d7bce3b Author: sethah Authored: Wed Dec 28 07:01:14 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 28 07:01:14 2016 -0800 -- .../LogisticRegressionSuite.scala | 60 +++--- .../ml/classification/NaiveBayesSuite.scala | 81 + .../ml/regression/LinearRegressionSuite.scala | 120 ++- .../apache/spark/ml/util/MLTestingUtils.scala | 111 +++-- 4 files changed, 154 insertions(+), 218 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a475ae4/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index f8bcbee..1308210 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -1836,52 +1836,24 @@ class LogisticRegressionSuite .forall(x => x(0) >= x(1))) } - test("binary logistic regression with weighted data") { -val numClasses = 2 -val numPoints = 40 -val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark, - numClasses, numPoints) -val testData = Array.tabulate[LabeledPoint](numClasses) { i => - LabeledPoint(i.toDouble, Vectors.dense(i.toDouble)) -}.toSeq.toDF() -val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight") -val model = lr.fit(outlierData) -val results = model.transform(testData).select("label", "prediction").collect() - -// check that the predictions are the one to one mapping -results.foreach { case Row(label: Double, pred: Double) => - assert(label === pred) + test("logistic regression with sample weights") { +def modelEquals(m1: LogisticRegressionModel, m2: LogisticRegressionModel): Unit = { + assert(m1.coefficientMatrix ~== m2.coefficientMatrix absTol 0.05) + assert(m1.interceptVector ~== m2.interceptVector absTol 0.05) } -val (overSampledData, weightedData) = - MLTestingUtils.genEquivalentOversampledAndWeightedInstances(outlierData, "label", "features", -42L) -val weightedModel = lr.fit(weightedData) -val overSampledModel = lr.setWeightCol("").fit(overSampledData) -assert(weightedModel.coefficientMatrix ~== overSampledModel.coefficientMatrix relTol 0.01) - } - - test("multinomial
spark git commit: [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags
Repository: spark Updated Branches: refs/heads/branch-2.0 f124d35e2 -> 5ed2f1c11 [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags ## What changes were proposed in this pull request? This adds back a direct dependency on Scala library classes from spark-tags because its Scala annotations need them. ## How was this patch tested? Existing tests Author: Sean OwenCloses #16418 from srowen/SPARK-18993. (cherry picked from commit d7bce3bd31ec193274718042dc017706989d7563) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ed2f1c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ed2f1c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ed2f1c1 Branch: refs/heads/branch-2.0 Commit: 5ed2f1c1118762191918e08936376113e6324935 Parents: f124d35 Author: Sean Owen Authored: Wed Dec 28 12:17:33 2016 + Committer: Sean Owen Committed: Wed Dec 28 12:17:53 2016 + -- common/tags/pom.xml | 8 1 file changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ed2f1c1/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index ecdc9e0..ca68ee2 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -35,6 +35,14 @@ tags + + + org.scala-lang + scala-library + ${scala.version} + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags
Repository: spark Updated Branches: refs/heads/branch-2.1 ac7107fe7 -> 7197a7bc7 [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags ## What changes were proposed in this pull request? This adds back a direct dependency on Scala library classes from spark-tags because its Scala annotations need them. ## How was this patch tested? Existing tests Author: Sean OwenCloses #16418 from srowen/SPARK-18993. (cherry picked from commit d7bce3bd31ec193274718042dc017706989d7563) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7197a7bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7197a7bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7197a7bc Branch: refs/heads/branch-2.1 Commit: 7197a7bc7061e2908b6430f494dba378378d5d02 Parents: ac7107f Author: Sean Owen Authored: Wed Dec 28 12:17:33 2016 + Committer: Sean Owen Committed: Wed Dec 28 12:17:41 2016 + -- common/tags/pom.xml | 8 1 file changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7197a7bc/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index 0778ee3..ad29848 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -34,6 +34,14 @@ tags + + + org.scala-lang + scala-library + ${scala.version} + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags
Repository: spark Updated Branches: refs/heads/master 2a5f52a71 -> d7bce3bd3 [SPARK-18993][BUILD] Unable to build/compile Spark in IntelliJ due to missing Scala deps in spark-tags ## What changes were proposed in this pull request? This adds back a direct dependency on Scala library classes from spark-tags because its Scala annotations need them. ## How was this patch tested? Existing tests Author: Sean OwenCloses #16418 from srowen/SPARK-18993. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7bce3bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7bce3bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7bce3bd Branch: refs/heads/master Commit: d7bce3bd31ec193274718042dc017706989d7563 Parents: 2a5f52a Author: Sean Owen Authored: Wed Dec 28 12:17:33 2016 + Committer: Sean Owen Committed: Wed Dec 28 12:17:33 2016 + -- common/tags/pom.xml | 8 1 file changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7bce3bd/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index 09f6fa1..9345dc8 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -34,6 +34,14 @@ tags + + + org.scala-lang + scala-library + ${scala.version} + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOC] Fix doc of ForeachWriter to use writeStream
Repository: spark Updated Branches: refs/heads/master 76e9bd748 -> 2a5f52a71 [MINOR][DOC] Fix doc of ForeachWriter to use writeStream ## What changes were proposed in this pull request? Fix the document of `ForeachWriter` to use `writeStream` instead of `write` for a streaming dataset. ## How was this patch tested? Docs only. Author: Carson WangCloses #16419 from carsonwang/FixDoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a5f52a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a5f52a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a5f52a7 Branch: refs/heads/master Commit: 2a5f52a7146abc05bf70e65eb2267cd869ac4789 Parents: 76e9bd7 Author: Carson Wang Authored: Wed Dec 28 12:12:44 2016 + Committer: Sean Owen Committed: Wed Dec 28 12:12:44 2016 + -- sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a5f52a7/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index b94ad59..372ec26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * * Scala example: * {{{ - * datasetOfString.write.foreach(new ForeachWriter[String] { + * datasetOfString.writeStream.foreach(new ForeachWriter[String] { * * def open(partitionId: Long, version: Long): Boolean = { * // open connection @@ -46,7 +46,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * * Java example: * {{{ - * datasetOfString.write().foreach(new ForeachWriter() { + * datasetOfString.writeStream().foreach(new ForeachWriter() { * *@Override *public boolean open(long partitionId, long version) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOC] Fix doc of ForeachWriter to use writeStream
Repository: spark Updated Branches: refs/heads/branch-2.1 ca25b1e51 -> ac7107fe7 [MINOR][DOC] Fix doc of ForeachWriter to use writeStream ## What changes were proposed in this pull request? Fix the document of `ForeachWriter` to use `writeStream` instead of `write` for a streaming dataset. ## How was this patch tested? Docs only. Author: Carson WangCloses #16419 from carsonwang/FixDoc. (cherry picked from commit 2a5f52a7146abc05bf70e65eb2267cd869ac4789) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac7107fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac7107fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac7107fe Branch: refs/heads/branch-2.1 Commit: ac7107fe70fcd0b584001c10dd624a4d8757109c Parents: ca25b1e Author: Carson Wang Authored: Wed Dec 28 12:12:44 2016 + Committer: Sean Owen Committed: Wed Dec 28 12:12:54 2016 + -- sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac7107fe/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index b94ad59..372ec26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * * Scala example: * {{{ - * datasetOfString.write.foreach(new ForeachWriter[String] { + * datasetOfString.writeStream.foreach(new ForeachWriter[String] { * * def open(partitionId: Long, version: Long): Boolean = { * // open connection @@ -46,7 +46,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * * Java example: * {{{ - * datasetOfString.write().foreach(new ForeachWriter() { + * datasetOfString.writeStream().foreach(new ForeachWriter() { * *@Override *public boolean open(long partitionId, long version) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18960][SQL][SS] Avoid double reading file which is being copied.
Repository: spark Updated Branches: refs/heads/master 67fb33e7e -> 76e9bd748 [SPARK-18960][SQL][SS] Avoid double reading file which is being copied. ## What changes were proposed in this pull request? In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times. ## How was this patch tested? update unit test Author: uncleGenCloses #16370 from uncleGen/SPARK-18960. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e9bd74 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e9bd74 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e9bd74 Branch: refs/heads/master Commit: 76e9bd74885a99462ed0957aad37cbead7f14de2 Parents: 67fb33e Author: uncleGen Authored: Wed Dec 28 10:42:47 2016 + Committer: Sean Owen Committed: Wed Dec 28 10:42:47 2016 + -- .../datasources/PartitioningAwareFileIndex.scala | 11 --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 825a0f7..82c1599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging { /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { -// We filter everything that starts with _ and ., except _common_metadata and _metadata +// We filter follow paths: +// 1. everything that starts with _ and ., except _common_metadata and _metadata // because Parquet needs to find those metadata files from leaf files returned by this method. // We should refactor this logic to not mix metadata files with data files. -((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") +// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we +// should skip this file in case of double reading. +val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || + pathName.startsWith(".") || pathName.endsWith("._COPYING_") +val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") +exclude && !include } } http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b7a472b..2b4c9f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext { assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) +assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_")) } test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][ML] Correct test cases of LoR raw2prediction & probability2prediction.
Repository: spark Updated Branches: refs/heads/master 79ff85363 -> 9cff67f34 [MINOR][ML] Correct test cases of LoR raw2prediction & probability2prediction. ## What changes were proposed in this pull request? Correct test cases of ```LogisticRegression``` raw2prediction & probability2prediction. ## How was this patch tested? Changed unit tests. Author: Yanbo LiangCloses #16407 from yanboliang/raw-probability. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cff67f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cff67f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cff67f3 Branch: refs/heads/master Commit: 9cff67f3465bc6ffe1b5abee9501e3c17f8fd194 Parents: 79ff853 Author: Yanbo Liang Authored: Wed Dec 28 01:24:18 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 28 01:24:18 2016 -0800 -- .../LogisticRegressionSuite.scala | 20 ++-- 1 file changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cff67f3/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 9c4c59a..f8bcbee 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -359,8 +359,16 @@ class LogisticRegressionSuite assert(pred == predFromProb) } -// force it to use probability2prediction +// force it to use raw2prediction model.setProbabilityCol("") +val resultsUsingRaw2Predict = + model.transform(smallMultinomialDataset).select("prediction").as[Double].collect() + resultsUsingRaw2Predict.zip(results.select("prediction").as[Double].collect()).foreach { + case (pred1, pred2) => assert(pred1 === pred2) +} + +// force it to use probability2prediction +model.setRawPredictionCol("") val resultsUsingProb2Predict = model.transform(smallMultinomialDataset).select("prediction").as[Double].collect() resultsUsingProb2Predict.zip(results.select("prediction").as[Double].collect()).foreach { @@ -405,8 +413,16 @@ class LogisticRegressionSuite assert(pred == predFromProb) } -// force it to use probability2prediction +// force it to use raw2prediction model.setProbabilityCol("") +val resultsUsingRaw2Predict = + model.transform(smallBinaryDataset).select("prediction").as[Double].collect() + resultsUsingRaw2Predict.zip(results.select("prediction").as[Double].collect()).foreach { + case (pred1, pred2) => assert(pred1 === pred2) +} + +// force it to use probability2prediction +model.setRawPredictionCol("") val resultsUsingProb2Predict = model.transform(smallBinaryDataset).select("prediction").as[Double].collect() resultsUsingProb2Predict.zip(results.select("prediction").as[Double].collect()).foreach { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17645][MLLIB][ML] add feature selector method based on: False Discovery Rate (FDR) and Family wise error rate (FWE)
Repository: spark Updated Branches: refs/heads/master 2af8b5cff -> 79ff85363 [SPARK-17645][MLLIB][ML] add feature selector method based on: False Discovery Rate (FDR) and Family wise error rate (FWE) ## What changes were proposed in this pull request? Univariate feature selection works by selecting the best features based on univariate statistical tests. FDR and FWE are a popular univariate statistical test for feature selection. In 2005, the Benjamini and Hochberg paper on FDR was identified as one of the 25 most-cited statistical papers. The FDR uses the Benjamini-Hochberg procedure in this PR. https://en.wikipedia.org/wiki/False_discovery_rate. In statistics, FWE is the probability of making one or more false discoveries, or type I errors, among all the hypotheses when performing multiple hypotheses tests. https://en.wikipedia.org/wiki/Family-wise_error_rate We add FDR and FWE methods for ChiSqSelector in this PR, like it is implemented in scikit-learn. http://scikit-learn.org/stable/modules/feature_selection.html#univariate-feature-selection ## How was this patch tested? ut will be added soon (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: PengAuthor: Peng, Meng Closes #15212 from mpjlu/fdr_fwe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79ff8536 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79ff8536 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79ff8536 Branch: refs/heads/master Commit: 79ff8536315aef97ee940c52d71cd8de777c7ce6 Parents: 2af8b5c Author: Peng Authored: Wed Dec 28 00:49:36 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 28 00:49:36 2016 -0800 -- docs/ml-features.md | 6 +- docs/mllib-feature-extraction.md| 4 +- .../apache/spark/ml/feature/ChiSqSelector.scala | 48 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 4 + .../spark/mllib/feature/ChiSqSelector.scala | 62 ++-- .../spark/ml/feature/ChiSqSelectorSuite.scala | 6 + .../mllib/feature/ChiSqSelectorSuite.scala | 147 +++ python/pyspark/ml/feature.py| 74 +- python/pyspark/mllib/feature.py | 50 ++- 9 files changed, 337 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79ff8536/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index ca1ccc4..1d34497 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1423,12 +1423,12 @@ for more details on the API. `ChiSqSelector` stands for Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the [Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which -features to choose. It supports three selection methods: `numTopFeatures`, `percentile`, `fpr`: - +features to choose. It supports five selection methods: `numTopFeatures`, `percentile`, `fpr`, `fdr`, `fwe`: * `numTopFeatures` chooses a fixed number of top features according to a chi-squared test. This is akin to yielding the features with the most predictive power. * `percentile` is similar to `numTopFeatures` but chooses a fraction of all features instead of a fixed number. * `fpr` chooses all features whose p-value is below a threshold, thus controlling the false positive rate of selection. - +* `fdr` uses the [Benjamini-Hochberg procedure](https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure) to choose all features whose false discovery rate is below a threshold. +* `fwe` chooses all features whose p-values is below a threshold, thus controlling the family-wise error rate of selection. By default, the selection method is `numTopFeatures`, with the default number of top features set to 50. The user can choose a selection method using `setSelectorType`. http://git-wip-us.apache.org/repos/asf/spark/blob/79ff8536/docs/mllib-feature-extraction.md -- diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 42568c3..acd2894 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -227,11 +227,13 @@ both speed and statistical learning behavior. [`ChiSqSelector`](api/scala/index.html#org.apache.spark.mllib.feature.ChiSqSelector) implements Chi-Squared feature selection. It operates on labeled data with