[SPARK-24499][SQL][DOC] Split the page of sql-programming-guide.html to multiple separate pages
1. Split the main page of sql-programming-guide into 7 parts: - Getting Started - Data Sources - Performance Turing - Distributed SQL Engine - PySpark Usage Guide for Pandas with Apache Arrow - Migration Guide - Reference 2. Add left menu for sql-programming-guide, keep first level index for each part in the menu. ![image](https://user-images.githubusercontent.com/4833765/47016859-6332e180-d183-11e8-92e8-ce62518a83c4.png) Local test with jekyll build/serve. Closes #22746 from xuanyuanking/SPARK-24499. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 987f386588de7311b066cf0f62f0eed64d4aa7d7) Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71535516 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71535516 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71535516 Branch: refs/heads/branch-2.4 Commit: 71535516419831242fa7fc9177e8f5fdd3c6112b Parents: 71a6a9c Author: Yuanjian Li <xyliyuanj...@gmail.com> Authored: Thu Oct 18 11:59:06 2018 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Oct 18 12:12:05 2018 -0700 ---------------------------------------------------------------------- docs/_data/menu-sql.yaml | 81 + docs/_includes/nav-left-wrapper-sql.html | 6 + docs/_includes/nav-left.html | 3 +- docs/_layouts/global.html | 8 +- docs/avro-data-source-guide.md | 380 --- docs/ml-pipeline.md | 2 +- docs/sparkr.md | 6 +- docs/sql-data-sources-avro.md | 380 +++ docs/sql-data-sources-hive-tables.md | 166 + docs/sql-data-sources-jdbc.md | 223 ++ docs/sql-data-sources-json.md | 81 + docs/sql-data-sources-load-save-functions.md | 283 ++ docs/sql-data-sources-orc.md | 26 + docs/sql-data-sources-parquet.md | 321 ++ docs/sql-data-sources-troubleshooting.md | 9 + docs/sql-data-sources.md | 42 + docs/sql-distributed-sql-engine.md | 84 + docs/sql-getting-started.md | 369 +++ docs/sql-migration-guide-hive-compatibility.md | 137 + docs/sql-migration-guide-upgrade.md | 516 +++ docs/sql-migration-guide.md | 23 + docs/sql-performance-turing.md | 151 + docs/sql-programming-guide.md | 3119 +------------------ docs/sql-pyspark-pandas-with-arrow.md | 166 + docs/sql-reference.md | 641 ++++ docs/structured-streaming-programming-guide.md | 2 +- 26 files changed, 3723 insertions(+), 3502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/_data/menu-sql.yaml ---------------------------------------------------------------------- diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml new file mode 100644 index 0000000..6718763 --- /dev/null +++ b/docs/_data/menu-sql.yaml @@ -0,0 +1,81 @@ +- text: Getting Started + url: sql-getting-started.html + subitems: + - text: "Starting Point: SparkSession" + url: sql-getting-started.html#starting-point-sparksession + - text: Creating DataFrames + url: sql-getting-started.html#creating-dataframes + - text: Untyped Dataset Operations (DataFrame operations) + url: sql-getting-started.html#untyped-dataset-operations-aka-dataframe-operations + - text: Running SQL Queries Programmatically + url: sql-getting-started.html#running-sql-queries-programmatically + - text: Global Temporary View + url: sql-getting-started.html#global-temporary-view + - text: Creating Datasets + url: sql-getting-started.html#creating-datasets + - text: Interoperating with RDDs + url: sql-getting-started.html#interoperating-with-rdds + - text: Aggregations + url: sql-getting-started.html#aggregations +- text: Data Sources + url: sql-data-sources.html + subitems: + - text: "Generic Load/Save Functions" + url: sql-data-sources-load-save-functions.html + - text: Parquet Files + url: sql-data-sources-parquet.html + - text: ORC Files + url: sql-data-sources-orc.html + - text: JSON Files + url: sql-data-sources-json.html + - text: Hive Tables + url: sql-data-sources-hive-tables.html + - text: JDBC To Other Databases + url: sql-data-sources-jdbc.html + - text: Avro Files + url: sql-data-sources-avro.html + - text: Troubleshooting + url: sql-data-sources-troubleshooting.html +- text: Performance Turing + url: sql-performance-turing.html + subitems: + - text: Caching Data In Memory + url: sql-performance-turing.html#caching-data-in-memory + - text: Other Configuration Options + url: sql-performance-turing.html#other-configuration-options + - text: Broadcast Hint for SQL Queries + url: sql-performance-turing.html#broadcast-hint-for-sql-queries +- text: Distributed SQL Engine + url: sql-distributed-sql-engine.html + subitems: + - text: "Running the Thrift JDBC/ODBC server" + url: sql-distributed-sql-engine.html#running-the-thrift-jdbcodbc-server + - text: Running the Spark SQL CLI + url: sql-distributed-sql-engine.html#running-the-spark-sql-cli +- text: PySpark Usage Guide for Pandas with Apache Arrow + url: sql-pyspark-pandas-with-arrow.html + subitems: + - text: Apache Arrow in Spark + url: sql-pyspark-pandas-with-arrow.html#apache-arrow-in-spark + - text: "Enabling for Conversion to/from Pandas" + url: sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas + - text: "Pandas UDFs (a.k.a. Vectorized UDFs)" + url: sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs + - text: Usage Notes + url: sql-pyspark-pandas-with-arrow.html#usage-notes +- text: Migration Guide + url: sql-migration-guide.html + subitems: + - text: Spark SQL Upgrading Guide + url: sql-migration-guide-upgrade.html + - text: Compatibility with Apache Hive + url: sql-migration-guide-hive-compatibility.html +- text: Reference + url: sql-reference.html + subitems: + - text: Data Types + url: sql-reference.html#data-types + - text: NaN Semantics + url: sql-reference.html#nan-semantics + - text: Arithmetic operations + url: sql-reference.html#arithmetic-operations http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/_includes/nav-left-wrapper-sql.html ---------------------------------------------------------------------- diff --git a/docs/_includes/nav-left-wrapper-sql.html b/docs/_includes/nav-left-wrapper-sql.html new file mode 100644 index 0000000..edc4cf4 --- /dev/null +++ b/docs/_includes/nav-left-wrapper-sql.html @@ -0,0 +1,6 @@ +<div class="left-menu-wrapper"> + <div class="left-menu"> + <h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3> + {% include nav-left.html nav=include.nav-sql %} + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/_includes/nav-left.html ---------------------------------------------------------------------- diff --git a/docs/_includes/nav-left.html b/docs/_includes/nav-left.html index 73176f4..19d68fd 100644 --- a/docs/_includes/nav-left.html +++ b/docs/_includes/nav-left.html @@ -10,7 +10,8 @@ {% endif %} </a> </li> - {% if item.subitems and navurl contains item.url %} + {% assign tag = item.url | remove: ".html" %} + {% if item.subitems and navurl contains tag %} {% include nav-left.html nav=item.subitems %} {% endif %} {% endfor %} http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/_layouts/global.html ---------------------------------------------------------------------- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 77edebe..cbe4306 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -126,8 +126,12 @@ <div class="container-wrapper"> - {% if page.url contains "/ml" %} - {% include nav-left-wrapper-ml.html nav-mllib=site.data.menu-mllib nav-ml=site.data.menu-ml %} + {% if page.url contains "/ml" or page.url contains "/sql" %} + {% if page.url contains "/ml" %} + {% include nav-left-wrapper-ml.html nav-mllib=site.data.menu-mllib nav-ml=site.data.menu-ml %} + {% else %} + {% include nav-left-wrapper-sql.html nav-sql=site.data.menu-sql %} + {% endif %} <input id="nav-trigger" class="nav-trigger" checked type="checkbox"> <label for="nav-trigger"></label> <div class="content-with-sidebar" id="content"> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/avro-data-source-guide.md ---------------------------------------------------------------------- diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md deleted file mode 100644 index d3b81f0..0000000 --- a/docs/avro-data-source-guide.md +++ /dev/null @@ -1,380 +0,0 @@ ---- -layout: global -title: Apache Avro Data Source Guide ---- - -* This will become a table of contents (this text will be scraped). -{:toc} - -Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. - -## Deploying -The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. - -As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` -and its dependencies can be directly added to `spark-submit` using `--packages`, such as, - - ./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... - -For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, - - ./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... - -See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. - -## Load and Save Functions - -Since `spark-avro` module is external, there is no `.avro` API in -`DataFrameReader` or `DataFrameWriter`. - -To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). -<div class="codetabs"> -<div data-lang="scala" markdown="1"> -{% highlight scala %} - -val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") -usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") - -{% endhighlight %} -</div> -<div data-lang="java" markdown="1"> -{% highlight java %} - -Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); -usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); - -{% endhighlight %} -</div> -<div data-lang="python" markdown="1"> -{% highlight python %} - -df = spark.read.format("avro").load("examples/src/main/resources/users.avro") -df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") - -{% endhighlight %} -</div> -<div data-lang="r" markdown="1"> -{% highlight r %} - -df <- read.df("examples/src/main/resources/users.avro", "avro") -write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") - -{% endhighlight %} -</div> -</div> - -## to_avro() and from_avro() -The Avro package provides function `to_avro` to encode a column as binary in Avro -format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to -another column, and the input/output SQL data type can be complex type or primitive type. - -Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each -Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. -* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. -* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. - -Both functions are currently only available in Scala and Java. - -<div class="codetabs"> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -import org.apache.spark.sql.avro._ - -// `from_avro` requires Avro schema in JSON string format. -val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) - -val df = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") - .load() - -// 1. Decode the Avro data into a struct; -// 2. Filter by column `favorite_color`; -// 3. Encode the column `name` in Avro format. -val output = df - .select(from_avro('value, jsonFormatSchema) as 'user) - .where("user.favorite_color == \"red\"") - .select(to_avro($"user.name") as 'value) - -val query = output - .writeStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("topic", "topic2") - .start() - -{% endhighlight %} -</div> -<div data-lang="java" markdown="1"> -{% highlight java %} -import org.apache.spark.sql.avro.*; - -// `from_avro` requires Avro schema in JSON string format. -String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))); - -Dataset<Row> df = spark - .readStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") - .load(); - -// 1. Decode the Avro data into a struct; -// 2. Filter by column `favorite_color`; -// 3. Encode the column `name` in Avro format. -Dataset<Row> output = df - .select(from_avro(col("value"), jsonFormatSchema).as("user")) - .where("user.favorite_color == \"red\"") - .select(to_avro(col("user.name")).as("value")); - -StreamingQuery query = output - .writeStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("topic", "topic2") - .start(); - -{% endhighlight %} -</div> -</div> - -## Data Source Option - -Data source options of Avro can be set using the `.option` method on `DataFrameReader` or `DataFrameWriter`. -<table class="table"> - <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr> - <tr> - <td><code>avroSchema</code></td> - <td>None</td> - <td>Optional Avro schema provided by an user in JSON format. The date type and naming of record fields - should match the input Avro data or Catalyst data, otherwise the read/write action will fail.</td> - <td>read and write</td> - </tr> - <tr> - <td><code>recordName</code></td> - <td>topLevelRecord</td> - <td>Top level record name in write result, which is required in Avro spec.</td> - <td>write</td> - </tr> - <tr> - <td><code>recordNamespace</code></td> - <td>""</td> - <td>Record namespace in write result.</td> - <td>write</td> - </tr> - <tr> - <td><code>ignoreExtension</code></td> - <td>true</td> - <td>The option controls ignoring of files without <code>.avro</code> extensions in read.<br> If the option is enabled, all files (with and without <code>.avro</code> extension) are loaded.</td> - <td>read</td> - </tr> - <tr> - <td><code>compression</code></td> - <td>snappy</td> - <td>The <code>compression</code> option allows to specify a compression codec used in write.<br> - Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and <code>xz</code>.<br> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td> - <td>write</td> - </tr> -</table> - -## Configuration -Configuration of Avro can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL. -<table class="table"> - <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> - <tr> - <td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td> - <td>true</td> - <td>If it is set to true, the data source provider <code>com.databricks.spark.avro</code> is mapped to the built-in but external Avro data source module for backward compatibility.</td> - </tr> - <tr> - <td>spark.sql.avro.compression.codec</td> - <td>snappy</td> - <td>Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.</td> - </tr> - <tr> - <td>spark.sql.avro.deflate.level</td> - <td>-1</td> - <td>Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.</td> - </tr> -</table> - -## Compatibility with Databricks spark-avro -This Avro data source module is originally from and compatible with Databricks's open source repository -[spark-avro](https://github.com/databricks/spark-avro). - -By default with the SQL configuration `spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source provider `com.databricks.spark.avro` is -mapped to this built-in Avro module. For the Spark tables created with `Provider` property as `com.databricks.spark.avro` in -catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module. - -Note in Databricks's [spark-avro](https://github.com/databricks/spark-avro), implicit classes -`AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut function `.avro()`. In this -built-in but external module, both implicit classes are removed. Please use `.format("avro")` in -`DataFrameWriter` or `DataFrameReader` instead, which should be clean and good enough. - -If you prefer using your own build of `spark-avro` jar file, you can simply disable the configuration -`spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option `--jars` on deploying your -applications. Read the [Advanced Dependency Management](https://spark.apache -.org/docs/latest/submitting-applications.html#advanced-dependency-management) section in Application -Submission Guide for more details. - -## Supported types for Avro -> Spark SQL conversion -Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) under records of Avro. -<table class="table"> - <tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr> - <tr> - <td>boolean</td> - <td>BooleanType</td> - </tr> - <tr> - <td>int</td> - <td>IntegerType</td> - </tr> - <tr> - <td>long</td> - <td>LongType</td> - </tr> - <tr> - <td>float</td> - <td>FloatType</td> - </tr> - <tr> - <td>double</td> - <td>DoubleType</td> - </tr> - <tr> - <td>string</td> - <td>StringType</td> - </tr> - <tr> - <td>enum</td> - <td>StringType</td> - </tr> - <tr> - <td>fixed</td> - <td>BinaryType</td> - </tr> - <tr> - <td>bytes</td> - <td>BinaryType</td> - </tr> - <tr> - <td>record</td> - <td>StructType</td> - </tr> - <tr> - <td>array</td> - <td>ArrayType</td> - </tr> - <tr> - <td>map</td> - <td>MapType</td> - </tr> - <tr> - <td>union</td> - <td>See below</td> - </tr> -</table> - -In addition to the types listed above, it supports reading `union` types. The following three types are considered basic `union` types: - -1. `union(int, long)` will be mapped to LongType. -2. `union(float, double)` will be mapped to DoubleType. -3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. -All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. - -It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types): - -<table class="table"> - <tr><th><b>Avro logical type</b></th><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr> - <tr> - <td>date</td> - <td>int</td> - <td>DateType</td> - </tr> - <tr> - <td>timestamp-millis</td> - <td>long</td> - <td>TimestampType</td> - </tr> - <tr> - <td>timestamp-micros</td> - <td>long</td> - <td>TimestampType</td> - </tr> - <tr> - <td>decimal</td> - <td>fixed</td> - <td>DecimalType</td> - </tr> - <tr> - <td>decimal</td> - <td>bytes</td> - <td>DecimalType</td> - </tr> -</table> -At the moment, it ignores docs, aliases and other properties present in the Avro file. - -## Supported types for Spark SQL -> Avro conversion -Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below: - -<table class="table"> -<tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr> - <tr> - <td>ByteType</td> - <td>int</td> - <td></td> - </tr> - <tr> - <td>ShortType</td> - <td>int</td> - <td></td> - </tr> - <tr> - <td>BinaryType</td> - <td>bytes</td> - <td></td> - </tr> - <tr> - <td>DateType</td> - <td>int</td> - <td>date</td> - </tr> - <tr> - <td>TimestampType</td> - <td>long</td> - <td>timestamp-micros</td> - </tr> - <tr> - <td>DecimalType</td> - <td>fixed</td> - <td>decimal</td> - </tr> -</table> - -You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema: - -<table class="table"> - <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr> - <tr> - <td>BinaryType</td> - <td>fixed</td> - <td></td> - </tr> - <tr> - <td>StringType</td> - <td>enum</td> - <td></td> - </tr> - <tr> - <td>TimestampType</td> - <td>long</td> - <td>timestamp-millis</td> - </tr> - <tr> - <td>DecimalType</td> - <td>bytes</td> - <td>decimal</td> - </tr> -</table> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/ml-pipeline.md ---------------------------------------------------------------------- diff --git a/docs/ml-pipeline.md b/docs/ml-pipeline.md index e22e900..8c01ccb 100644 --- a/docs/ml-pipeline.md +++ b/docs/ml-pipeline.md @@ -57,7 +57,7 @@ E.g., a learning algorithm is an `Estimator` which trains on a `DataFrame` and p Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data. This API adopts the `DataFrame` from Spark SQL in order to support a variety of data types. -`DataFrame` supports many basic and structured types; see the [Spark SQL datatype reference](sql-programming-guide.html#data-types) for a list of supported types. +`DataFrame` supports many basic and structured types; see the [Spark SQL datatype reference](sql-reference.html#data-types) for a list of supported types. In addition to the types listed in the Spark SQL guide, `DataFrame` can use ML [`Vector`](mllib-data-types.html#local-vector) types. A `DataFrame` can be created either implicitly or explicitly from a regular `RDD`. See the code examples below and the [Spark SQL programming guide](sql-programming-guide.html) for examples. http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sparkr.md ---------------------------------------------------------------------- diff --git a/docs/sparkr.md b/docs/sparkr.md index b4248e8..d9963b7 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -104,7 +104,7 @@ The following Spark driver properties can be set in `sparkConfig` with `sparkR.s </div> ## Creating SparkDataFrames -With a `SparkSession`, applications can create `SparkDataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources). +With a `SparkSession`, applications can create `SparkDataFrame`s from a local R data frame, from a [Hive table](sql-data-sources-hive-tables.html), or from other [data sources](sql-data-sources.html). ### From local data frames The simplest way to create a data frame is to convert a local R data frame into a SparkDataFrame. Specifically, we can use `as.DataFrame` or `createDataFrame` and pass in the local R data frame to create a SparkDataFrame. As an example, the following creates a `SparkDataFrame` based using the `faithful` dataset from R. @@ -125,7 +125,7 @@ head(df) ### From Data Sources -SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. +SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-data-sources-load-save-functions.html#manually-specifying-options) that are available for the built-in data sources. The general method for creating SparkDataFrames from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically. SparkR supports reading JSON, CSV and Parquet files natively, and through packages available from sources like [Third Party Projects](https://spark.apache.org/third-party-projects.html), you can find data source connectors for popular file formats like Avro. These packages can either be added by @@ -180,7 +180,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite" ### From Hive tables -You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details can be found in the [SQL programming guide](sql-programming-guide.html#starting-point-sparksession). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`). +You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details can be found in the [SQL programming guide](sql-getting-started.html#starting-point-sparksession). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`). <div data-lang="r" markdown="1"> {% highlight r %} http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-avro.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md new file mode 100644 index 0000000..d3b81f0 --- /dev/null +++ b/docs/sql-data-sources-avro.md @@ -0,0 +1,380 @@ +--- +layout: global +title: Apache Avro Data Source Guide +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. + +## Deploying +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + + ./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + + ./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Load and Save Functions + +Since `spark-avro` module is external, there is no `.avro` API in +`DataFrameReader` or `DataFrameWriter`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} +</div> +<div data-lang="java" markdown="1"> +{% highlight java %} + +Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} +</div> +<div data-lang="r" markdown="1"> +{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} +</div> +</div> + +## to_avro() and from_avro() +The Avro package provides function `to_avro` to encode a column as binary in Avro +format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be complex type or primitive type. + +Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. +* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Both functions are currently only available in Scala and Java. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.spark.sql.avro._ + +// `from_avro` requires Avro schema in JSON string format. +val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) + +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +val output = df + .select(from_avro('value, jsonFormatSchema) as 'user) + .where("user.favorite_color == \"red\"") + .select(to_avro($"user.name") as 'value) + +val query = output + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic2") + .start() + +{% endhighlight %} +</div> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.spark.sql.avro.*; + +// `from_avro` requires Avro schema in JSON string format. +String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))); + +Dataset<Row> df = spark + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load(); + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +Dataset<Row> output = df + .select(from_avro(col("value"), jsonFormatSchema).as("user")) + .where("user.favorite_color == \"red\"") + .select(to_avro(col("user.name")).as("value")); + +StreamingQuery query = output + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic2") + .start(); + +{% endhighlight %} +</div> +</div> + +## Data Source Option + +Data source options of Avro can be set using the `.option` method on `DataFrameReader` or `DataFrameWriter`. +<table class="table"> + <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr> + <tr> + <td><code>avroSchema</code></td> + <td>None</td> + <td>Optional Avro schema provided by an user in JSON format. The date type and naming of record fields + should match the input Avro data or Catalyst data, otherwise the read/write action will fail.</td> + <td>read and write</td> + </tr> + <tr> + <td><code>recordName</code></td> + <td>topLevelRecord</td> + <td>Top level record name in write result, which is required in Avro spec.</td> + <td>write</td> + </tr> + <tr> + <td><code>recordNamespace</code></td> + <td>""</td> + <td>Record namespace in write result.</td> + <td>write</td> + </tr> + <tr> + <td><code>ignoreExtension</code></td> + <td>true</td> + <td>The option controls ignoring of files without <code>.avro</code> extensions in read.<br> If the option is enabled, all files (with and without <code>.avro</code> extension) are loaded.</td> + <td>read</td> + </tr> + <tr> + <td><code>compression</code></td> + <td>snappy</td> + <td>The <code>compression</code> option allows to specify a compression codec used in write.<br> + Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and <code>xz</code>.<br> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td> + <td>write</td> + </tr> +</table> + +## Configuration +Configuration of Avro can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL. +<table class="table"> + <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> + <tr> + <td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td> + <td>true</td> + <td>If it is set to true, the data source provider <code>com.databricks.spark.avro</code> is mapped to the built-in but external Avro data source module for backward compatibility.</td> + </tr> + <tr> + <td>spark.sql.avro.compression.codec</td> + <td>snappy</td> + <td>Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.</td> + </tr> + <tr> + <td>spark.sql.avro.deflate.level</td> + <td>-1</td> + <td>Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.</td> + </tr> +</table> + +## Compatibility with Databricks spark-avro +This Avro data source module is originally from and compatible with Databricks's open source repository +[spark-avro](https://github.com/databricks/spark-avro). + +By default with the SQL configuration `spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source provider `com.databricks.spark.avro` is +mapped to this built-in Avro module. For the Spark tables created with `Provider` property as `com.databricks.spark.avro` in +catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module. + +Note in Databricks's [spark-avro](https://github.com/databricks/spark-avro), implicit classes +`AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut function `.avro()`. In this +built-in but external module, both implicit classes are removed. Please use `.format("avro")` in +`DataFrameWriter` or `DataFrameReader` instead, which should be clean and good enough. + +If you prefer using your own build of `spark-avro` jar file, you can simply disable the configuration +`spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option `--jars` on deploying your +applications. Read the [Advanced Dependency Management](https://spark.apache +.org/docs/latest/submitting-applications.html#advanced-dependency-management) section in Application +Submission Guide for more details. + +## Supported types for Avro -> Spark SQL conversion +Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) under records of Avro. +<table class="table"> + <tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr> + <tr> + <td>boolean</td> + <td>BooleanType</td> + </tr> + <tr> + <td>int</td> + <td>IntegerType</td> + </tr> + <tr> + <td>long</td> + <td>LongType</td> + </tr> + <tr> + <td>float</td> + <td>FloatType</td> + </tr> + <tr> + <td>double</td> + <td>DoubleType</td> + </tr> + <tr> + <td>string</td> + <td>StringType</td> + </tr> + <tr> + <td>enum</td> + <td>StringType</td> + </tr> + <tr> + <td>fixed</td> + <td>BinaryType</td> + </tr> + <tr> + <td>bytes</td> + <td>BinaryType</td> + </tr> + <tr> + <td>record</td> + <td>StructType</td> + </tr> + <tr> + <td>array</td> + <td>ArrayType</td> + </tr> + <tr> + <td>map</td> + <td>MapType</td> + </tr> + <tr> + <td>union</td> + <td>See below</td> + </tr> +</table> + +In addition to the types listed above, it supports reading `union` types. The following three types are considered basic `union` types: + +1. `union(int, long)` will be mapped to LongType. +2. `union(float, double)` will be mapped to DoubleType. +3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. +All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. + +It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types): + +<table class="table"> + <tr><th><b>Avro logical type</b></th><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr> + <tr> + <td>date</td> + <td>int</td> + <td>DateType</td> + </tr> + <tr> + <td>timestamp-millis</td> + <td>long</td> + <td>TimestampType</td> + </tr> + <tr> + <td>timestamp-micros</td> + <td>long</td> + <td>TimestampType</td> + </tr> + <tr> + <td>decimal</td> + <td>fixed</td> + <td>DecimalType</td> + </tr> + <tr> + <td>decimal</td> + <td>bytes</td> + <td>DecimalType</td> + </tr> +</table> +At the moment, it ignores docs, aliases and other properties present in the Avro file. + +## Supported types for Spark SQL -> Avro conversion +Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below: + +<table class="table"> +<tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr> + <tr> + <td>ByteType</td> + <td>int</td> + <td></td> + </tr> + <tr> + <td>ShortType</td> + <td>int</td> + <td></td> + </tr> + <tr> + <td>BinaryType</td> + <td>bytes</td> + <td></td> + </tr> + <tr> + <td>DateType</td> + <td>int</td> + <td>date</td> + </tr> + <tr> + <td>TimestampType</td> + <td>long</td> + <td>timestamp-micros</td> + </tr> + <tr> + <td>DecimalType</td> + <td>fixed</td> + <td>decimal</td> + </tr> +</table> + +You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema: + +<table class="table"> + <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr> + <tr> + <td>BinaryType</td> + <td>fixed</td> + <td></td> + </tr> + <tr> + <td>StringType</td> + <td>enum</td> + <td></td> + </tr> + <tr> + <td>TimestampType</td> + <td>long</td> + <td>timestamp-millis</td> + </tr> + <tr> + <td>DecimalType</td> + <td>bytes</td> + <td>decimal</td> + </tr> +</table> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-hive-tables.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md new file mode 100644 index 0000000..687e6f8 --- /dev/null +++ b/docs/sql-data-sources-hive-tables.md @@ -0,0 +1,166 @@ +--- +layout: global +title: Hive Tables +displayTitle: Hive Tables +--- + +* Table of contents +{:toc} + +Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). +However, since Hive has a large number of dependencies, these dependencies are not included in the +default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them +automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as +they will need access to the Hive serialization and deserialization libraries (SerDes) in order to +access data stored in Hive. + +Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` (for security configuration), +and `hdfs-site.xml` (for HDFS configuration) file in `conf/`. + +When working with Hive, one must instantiate `SparkSession` with Hive support, including +connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions. +Users who do not have an existing Hive deployment can still enable Hive support. When not configured +by the `hive-site.xml`, the context automatically creates `metastore_db` in the current directory and +creates a directory configured by `spark.sql.warehouse.dir`, which defaults to the directory +`spark-warehouse` in the current directory that the Spark application is started. Note that +the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated since Spark 2.0.0. +Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse. +You may need to grant write privilege to the user who starts the Spark application. + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example spark_hive scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example spark_hive java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example spark_hive python/sql/hive.py %} +</div> + +<div data-lang="r" markdown="1"> + +When working with Hive one must instantiate `SparkSession` with Hive support. This +adds support for finding tables in the MetaStore and writing queries using HiveQL. + +{% include_example spark_hive r/RSparkSQLExample.R %} + +</div> +</div> + +### Specifying storage format for Hive tables + +When you create a Hive table, you need to define how this table should read/write data from/to file system, +i.e. the "input format" and "output format". You also need to define how this table should deserialize the data +to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage +format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`. +By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when +creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it. + +<table class="table"> + <tr><th>Property Name</th><th>Meaning</th></tr> + <tr> + <td><code>fileFormat</code></td> + <td> + A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and + "output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'. + </td> + </tr> + + <tr> + <td><code>inputFormat, outputFormat</code></td> + <td> + These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal, + e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not + specify them if you already specified the `fileFormat` option. + </td> + </tr> + + <tr> + <td><code>serde</code></td> + <td> + This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option + if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" + don't include the serde information and you can use this option with these 3 fileFormats. + </td> + </tr> + + <tr> + <td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td> + <td> + These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows. + </td> + </tr> +</table> + +All other properties defined with `OPTIONS` will be regarded as Hive serde properties. + +### Interacting with Different Versions of Hive Metastore + +One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore, +which enables Spark SQL to access metadata of Hive tables. Starting from Spark 1.4.0, a single binary +build of Spark SQL can be used to query different versions of Hive metastores, using the configuration described below. +Note that independent of the version of Hive that is being used to talk to the metastore, internally Spark SQL +will compile against Hive 1.2.1 and use those classes for internal execution (serdes, UDFs, UDAFs, etc). + +The following options can be used to configure the version of Hive that is used to retrieve metadata: + +<table class="table"> + <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> + <tr> + <td><code>spark.sql.hive.metastore.version</code></td> + <td><code>1.2.1</code></td> + <td> + Version of the Hive metastore. Available + options are <code>0.12.0</code> through <code>2.3.3</code>. + </td> + </tr> + <tr> + <td><code>spark.sql.hive.metastore.jars</code></td> + <td><code>builtin</code></td> + <td> + Location of the jars that should be used to instantiate the HiveMetastoreClient. This + property can be one of three options: + <ol> + <li><code>builtin</code></li> + Use Hive 1.2.1, which is bundled with the Spark assembly when <code>-Phive</code> is + enabled. When this option is chosen, <code>spark.sql.hive.metastore.version</code> must be + either <code>1.2.1</code> or not defined. + <li><code>maven</code></li> + Use Hive jars of specified version downloaded from Maven repositories. This configuration + is not generally recommended for production deployments. + <li>A classpath in the standard format for the JVM. This classpath must include all of Hive + and its dependencies, including the correct version of Hadoop. These jars only need to be + present on the driver, but if you are running in yarn cluster mode then you must ensure + they are packaged with your application.</li> + </ol> + </td> + </tr> + <tr> + <td><code>spark.sql.hive.metastore.sharedPrefixes</code></td> + <td><code>com.mysql.jdbc,<br/>org.postgresql,<br/>com.microsoft.sqlserver,<br/>oracle.jdbc</code></td> + <td> + <p> + A comma-separated list of class prefixes that should be loaded using the classloader that is + shared between Spark SQL and a specific version of Hive. An example of classes that should + be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need + to be shared are those that interact with classes that are already shared. For example, + custom appenders that are used by log4j. + </p> + </td> + </tr> + <tr> + <td><code>spark.sql.hive.metastore.barrierPrefixes</code></td> + <td><code>(empty)</code></td> + <td> + <p> + A comma separated list of class prefixes that should explicitly be reloaded for each version + of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a + prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>). + </p> + </td> + </tr> +</table> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-jdbc.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md new file mode 100644 index 0000000..057e821 --- /dev/null +++ b/docs/sql-data-sources-jdbc.md @@ -0,0 +1,223 @@ +--- +layout: global +title: JDBC To Other Databases +displayTitle: JDBC To Other Databases +--- + +* Table of contents +{:toc} + +Spark SQL also includes a data source that can read data from other databases using JDBC. This +functionality should be preferred over using [JdbcRDD](api/scala/index.html#org.apache.spark.rdd.JdbcRDD). +This is because the results are returned +as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. +The JDBC data source is also easier to use from Java or Python as it does not require the user to +provide a ClassTag. +(Note that this is different than the Spark SQL JDBC server, which allows other applications to +run queries using Spark SQL). + +To get started you will need to include the JDBC driver for your particular database on the +spark classpath. For example, to connect to postgres from the Spark Shell you would run the +following command: + +{% highlight bash %} +bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar +{% endhighlight %} + +Tables from the remote database can be loaded as a DataFrame or Spark SQL temporary view using +the Data Sources API. Users can specify the JDBC connection properties in the data source options. +<code>user</code> and <code>password</code> are normally provided as connection properties for +logging into the data sources. In addition to the connection properties, Spark also supports +the following case-insensitive options: + +<table class="table"> + <tr><th>Property Name</th><th>Meaning</th></tr> + <tr> + <td><code>url</code></td> + <td> + The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., <code>jdbc:postgresql://localhost/test?user=fred&password=secret</code> + </td> + </tr> + + <tr> + <td><code>dbtable</code></td> + <td> + The JDBC table that should be read from or written into. Note that when using it in the read + path anything that is valid in a <code>FROM</code> clause of a SQL query can be used. + For example, instead of a full table you could also use a subquery in parentheses. It is not + allowed to specify `dbtable` and `query` options at the same time. + </td> + </tr> + <tr> + <td><code>query</code></td> + <td> + A query that will be used to read data into Spark. The specified query will be parenthesized and used + as a subquery in the <code>FROM</code> clause. Spark will also assign an alias to the subquery clause. + As an example, spark will issue a query of the following form to the JDBC Source.<br><br> + <code> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias</code><br><br> + Below are couple of restrictions while using this option.<br> + <ol> + <li> It is not allowed to specify `dbtable` and `query` options at the same time. </li> + <li> It is not allowed to specify `query` and `partitionColumn` options at the same time. When specifying + `partitionColumn` option is required, the subquery can be specified using `dbtable` option instead and + partition columns can be qualified using the subquery alias provided as part of `dbtable`. <br> + Example:<br> + <code> + spark.read.format("jdbc")<br> +    .option("dbtable", "(select c1, c2 from t1) as subq")<br> +    .option("partitionColumn", "subq.c1"<br> +    .load() + </code></li> + </ol> + </td> + </tr> + + <tr> + <td><code>driver</code></td> + <td> + The class name of the JDBC driver to use to connect to this URL. + </td> + </tr> + + <tr> + <td><code>partitionColumn, lowerBound, upperBound</code></td> + <td> + These options must all be specified if any of them is specified. In addition, + <code>numPartitions</code> must be specified. They describe how to partition the table when + reading in parallel from multiple workers. + <code>partitionColumn</code> must be a numeric, date, or timestamp column from the table in question. + Notice that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the + partition stride, not for filtering the rows in table. So all rows in the table will be + partitioned and returned. This option applies only to reading. + </td> + </tr> + + <tr> + <td><code>numPartitions</code></td> + <td> + The maximum number of partitions that can be used for parallelism in table reading and + writing. This also determines the maximum number of concurrent JDBC connections. + If the number of partitions to write exceeds this limit, we decrease it to this limit by + calling <code>coalesce(numPartitions)</code> before writing. + </td> + </tr> + + <tr> + <td><code>queryTimeout</code></td> + <td> + The number of seconds the driver will wait for a Statement object to execute to the given + number of seconds. Zero means there is no limit. In the write path, this option depends on + how JDBC drivers implement the API <code>setQueryTimeout</code>, e.g., the h2 JDBC driver + checks the timeout of each query instead of an entire JDBC batch. + It defaults to <code>0</code>. + </td> + </tr> + + <tr> + <td><code>fetchsize</code></td> + <td> + The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading. + </td> + </tr> + + <tr> + <td><code>batchsize</code></td> + <td> + The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to <code>1000</code>. + </td> + </tr> + + <tr> + <td><code>isolationLevel</code></td> + <td> + The transaction isolation level, which applies to current connection. It can be one of <code>NONE</code>, <code>READ_COMMITTED</code>, <code>READ_UNCOMMITTED</code>, <code>REPEATABLE_READ</code>, or <code>SERIALIZABLE</code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED</code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>. + </td> + </tr> + + <tr> + <td><code>sessionInitStatement</code></td> + <td> + After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code> + </td> + </tr> + + <tr> + <td><code>truncate</code></td> + <td> + This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>. This option applies only to writing. + </td> + </tr> + + <tr> + <td><code>cascadeTruncate</code></td> + <td> + This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a <code>TRUNCATE TABLE t CASCADE</code> (in the case of PostgreSQL a <code>TRUNCATE TABLE ONLY t CASCADE</code> is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the <code>isCascadeTruncate</code> in each JDBCDialect. + </td> + </tr> + + <tr> + <td><code>createTableOptions</code></td> + <td> + This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>). This option applies only to writing. + </td> + </tr> + + <tr> + <td><code>createTableColumnTypes</code></td> + <td> + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: <code>"name CHAR(64), comments VARCHAR(1024)")</code>. The specified types should be valid spark sql data types. This option applies only to writing. + </td> + </tr> + + <tr> + <td><code>customSchema</code></td> + <td> + The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. + </td> + </tr> + + <tr> + <td><code>pushDownPredicate</code></td> + <td> + The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source. + </td> + </tr> +</table> + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example jdbc_dataset scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example jdbc_dataset java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example jdbc_dataset python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +{% include_example jdbc_dataset r/RSparkSQLExample.R %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TEMPORARY VIEW jdbcTable +USING org.apache.spark.sql.jdbc +OPTIONS ( + url "jdbc:postgresql:dbserver", + dbtable "schema.tablename", + user 'username', + password 'password' +) + +INSERT INTO TABLE jdbcTable +SELECT * FROM resultTable +{% endhighlight %} + +</div> +</div> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-json.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md new file mode 100644 index 0000000..f84336b --- /dev/null +++ b/docs/sql-data-sources-json.md @@ -0,0 +1,81 @@ +--- +layout: global +title: JSON Files +displayTitle: JSON Files +--- + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a `Dataset[Row]`. +This conversion can be done using `SparkSession.read.json()` on either a `Dataset[String]`, +or a JSON file. + +Note that the file that is offered as _a json file_ is not a typical JSON file. Each +line must contain a separate, self-contained valid JSON object. For more information, please see +[JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). + +For a regular multi-line JSON file, set the `multiLine` option to `true`. + +{% include_example json_dataset scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a `Dataset<Row>`. +This conversion can be done using `SparkSession.read().json()` on either a `Dataset<String>`, +or a JSON file. + +Note that the file that is offered as _a json file_ is not a typical JSON file. Each +line must contain a separate, self-contained valid JSON object. For more information, please see +[JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). + +For a regular multi-line JSON file, set the `multiLine` option to `true`. + +{% include_example json_dataset java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. +This conversion can be done using `SparkSession.read.json` on a JSON file. + +Note that the file that is offered as _a json file_ is not a typical JSON file. Each +line must contain a separate, self-contained valid JSON object. For more information, please see +[JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). + +For a regular multi-line JSON file, set the `multiLine` parameter to `True`. + +{% include_example json_dataset python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using +the `read.json()` function, which loads data from a directory of JSON files where each line of the +files is a JSON object. + +Note that the file that is offered as _a json file_ is not a typical JSON file. Each +line must contain a separate, self-contained valid JSON object. For more information, please see +[JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). + +For a regular multi-line JSON file, set a named parameter `multiLine` to `TRUE`. + +{% include_example json_dataset r/RSparkSQLExample.R %} + +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TEMPORARY VIEW jsonTable +USING org.apache.spark.sql.json +OPTIONS ( + path "examples/src/main/resources/people.json" +) + +SELECT * FROM jsonTable + +{% endhighlight %} + +</div> + +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-load-save-functions.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md new file mode 100644 index 0000000..e1dd0a3 --- /dev/null +++ b/docs/sql-data-sources-load-save-functions.md @@ -0,0 +1,283 @@ +--- +layout: global +title: Generic Load/Save Functions +displayTitle: Generic Load/Save Functions +--- + +* Table of contents +{:toc} + + +In the simplest form, the default data source (`parquet` unless otherwise configured by +`spark.sql.sources.default`) will be used for all operations. + + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% include_example generic_load_save_functions scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example generic_load_save_functions java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> + +{% include_example generic_load_save_functions python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> + +{% include_example generic_load_save_functions r/RSparkSQLExample.R %} + +</div> +</div> + +### Manually Specifying Options + +You can also manually specify the data source that will be used along with any extra options +that you would like to pass to the data source. Data sources are specified by their fully qualified +name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you can also use their short +names (`json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`). DataFrames loaded from any data +source type can be converted into other types using this syntax. + +To load a JSON file you can use: + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% include_example manual_load_options scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example manual_load_options java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example manual_load_options python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +{% include_example manual_load_options r/RSparkSQLExample.R %} +</div> +</div> + +To load a CSV file you can use: + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% include_example manual_load_options_csv scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example manual_load_options_csv java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example manual_load_options_csv python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +{% include_example manual_load_options_csv r/RSparkSQLExample.R %} + +</div> +</div> + +### Run SQL on files directly + +Instead of using read API to load a file into DataFrame and query it, you can also query that +file directly with SQL. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% include_example direct_sql scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example direct_sql java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example direct_sql python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +{% include_example direct_sql r/RSparkSQLExample.R %} + +</div> +</div> + +### Save Modes + +Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if +present. It is important to realize that these save modes do not utilize any locking and are not +atomic. Additionally, when performing an `Overwrite`, the data will be deleted before writing out the +new data. + +<table class="table"> +<tr><th>Scala/Java</th><th>Any Language</th><th>Meaning</th></tr> +<tr> + <td><code>SaveMode.ErrorIfExists</code> (default)</td> + <td><code>"error" or "errorifexists"</code> (default)</td> + <td> + When saving a DataFrame to a data source, if data already exists, + an exception is expected to be thrown. + </td> +</tr> +<tr> + <td><code>SaveMode.Append</code></td> + <td><code>"append"</code></td> + <td> + When saving a DataFrame to a data source, if data/table already exists, + contents of the DataFrame are expected to be appended to existing data. + </td> +</tr> +<tr> + <td><code>SaveMode.Overwrite</code></td> + <td><code>"overwrite"</code></td> + <td> + Overwrite mode means that when saving a DataFrame to a data source, + if data/table already exists, existing data is expected to be overwritten by the contents of + the DataFrame. + </td> +</tr> +<tr> + <td><code>SaveMode.Ignore</code></td> + <td><code>"ignore"</code></td> + <td> + Ignore mode means that when saving a DataFrame to a data source, if data already exists, + the save operation is expected not to save the contents of the DataFrame and not to + change the existing data. This is similar to a <code>CREATE TABLE IF NOT EXISTS</code> in SQL. + </td> +</tr> +</table> + +### Saving to Persistent Tables + +`DataFrames` can also be saved as persistent tables into Hive metastore using the `saveAsTable` +command. Notice that an existing Hive deployment is not necessary to use this feature. Spark will create a +default local Hive metastore (using Derby) for you. Unlike the `createOrReplaceTempView` command, +`saveAsTable` will materialize the contents of the DataFrame and create a pointer to the data in the +Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as +long as you maintain your connection to the same metastore. A DataFrame for a persistent table can +be created by calling the `table` method on a `SparkSession` with the name of the table. + +For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the +`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped, +the custom table path will not be removed and the table data is still there. If no custom table path is +specified, Spark will write data to a default table path under the warehouse directory. When the table is +dropped, the default table path will be removed too. + +Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits: + +- Since the metastore can return only necessary partitions for a query, discovering all the partitions on the first query to the table is no longer needed. +- Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now available for tables created with the Datasource API. + +Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`. + +### Bucketing, Sorting and Partitioning + +For file-based data source, it is also possible to bucket and sort or partition the output. +Bucketing and sorting are applicable only to persistent tables: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_sorting_and_bucketing python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_bucketed_by_name( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING parquet +CLUSTERED BY(name) INTO 42 BUCKETS; + +{% endhighlight %} + +</div> + +</div> + +while partitioning can be used with both `save` and `saveAsTable` when using the Dataset APIs. + + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_partitioning python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_by_favorite_color( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING csv PARTITIONED BY(favorite_color); + +{% endhighlight %} + +</div> + +</div> + +It is possible to use both partitioning and bucketing for a single table: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_partition_and_bucket python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_bucketed_and_partitioned( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING parquet +PARTITIONED BY (favorite_color) +CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS; + +{% endhighlight %} + +</div> + +</div> + +`partitionBy` creates a directory structure as described in the [Partition Discovery](sql-data-sources-parquet.html#partition-discovery) section. +Thus, it has limited applicability to columns with high cardinality. In contrast + `bucketBy` distributes +data across a fixed number of buckets and can be used when a number of unique values is unbounded. http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-orc.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-orc.md b/docs/sql-data-sources-orc.md new file mode 100644 index 0000000..ef07d2f --- /dev/null +++ b/docs/sql-data-sources-orc.md @@ -0,0 +1,26 @@ +--- +layout: global +title: ORC Files +displayTitle: ORC Files +--- + +Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. +To do that, the following configurations are newly added. The vectorized reader is used for the +native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` +is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC +serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), +the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`. + +<table class="table"> + <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> + <tr> + <td><code>spark.sql.orc.impl</code></td> + <td><code>native</code></td> + <td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4. `hive` means the ORC library in Hive 1.2.1.</td> + </tr> + <tr> + <td><code>spark.sql.orc.enableVectorizedReader</code></td> + <td><code>true</code></td> + <td>Enables vectorized orc decoding in <code>native</code> implementation. If <code>false</code>, a new non-vectorized ORC reader is used in <code>native</code> implementation. For <code>hive</code> implementation, this is ignored.</td> + </tr> +</table> http://git-wip-us.apache.org/repos/asf/spark/blob/71535516/docs/sql-data-sources-parquet.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md new file mode 100644 index 0000000..4fed3ea --- /dev/null +++ b/docs/sql-data-sources-parquet.md @@ -0,0 +1,321 @@ +--- +layout: global +title: Parquet Files +displayTitle: Parquet Files +--- + +* Table of contents +{:toc} + +[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. +Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema +of the original data. When writing Parquet files, all columns are automatically converted to be nullable for +compatibility reasons. + +### Loading Data Programmatically + +Using the data from the above example: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> + +{% include_example basic_parquet_example python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> + +{% include_example basic_parquet_example r/RSparkSQLExample.R %} + +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TEMPORARY VIEW parquetTable +USING org.apache.spark.sql.parquet +OPTIONS ( + path "examples/src/main/resources/people.parquet" +) + +SELECT * FROM parquetTable + +{% endhighlight %} + +</div> + +</div> + +### Partition Discovery + +Table partitioning is a common optimization approach used in systems like Hive. In a partitioned +table, data are usually stored in different directories, with partitioning column values encoded in +the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet) +are able to discover and infer partitioning information automatically. +For example, we can store all our previously used +population data into a partitioned table using the following directory structure, with two extra +columns, `gender` and `country` as partitioning columns: + +{% highlight text %} + +path +âââ to + âââ table + âââ gender=male + â  âââ ... + â  â + â  âââ country=US + â  â  âââ data.parquet + â  âââ country=CN + â  â  âââ data.parquet + â  âââ ... + âââ gender=female +   âââ ... +   â +   âââ country=US +   â  âââ data.parquet +   âââ country=CN +   â  âââ data.parquet +   âââ ... + +{% endhighlight %} + +By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL +will automatically extract the partitioning information from the paths. +Now the schema of the returned DataFrame becomes: + +{% highlight text %} + +root +|-- name: string (nullable = true) +|-- age: long (nullable = true) +|-- gender: string (nullable = true) +|-- country: string (nullable = true) + +{% endhighlight %} + +Notice that the data types of the partitioning columns are automatically inferred. Currently, +numeric data types, date, timestamp and string type are supported. Sometimes users may not want +to automatically infer the data types of the partitioning columns. For these use cases, the +automatic type inference can be configured by +`spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type +inference is disabled, string type will be used for the partitioning columns. + +Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths +by default. For the above example, if users pass `path/to/table/gender=male` to either +`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a +partitioning column. If users need to specify the base path that partition discovery +should start with, they can set `basePath` in the data source options. For example, +when `path/to/table/gender=male` is the path of the data and +users set `basePath` to `path/to/table/`, `gender` will be a partitioning column. + +### Schema Merging + +Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with +a simple schema, and gradually add more columns to the schema as needed. In this way, users may end +up with multiple Parquet files with different but mutually compatible schemas. The Parquet data +source is now able to automatically detect this case and merge schemas of all these files. + +Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we +turned it off by default starting from 1.5.0. You may enable it by + +1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the + examples below), or +2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`. + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> + +{% include_example schema_merging python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> + +{% include_example schema_merging r/RSparkSQLExample.R %} + +</div> + +</div> + +### Hive metastore Parquet table conversion + +When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own +Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the +`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by default. + +#### Hive/Parquet Schema Reconciliation + +There are two key differences between Hive and Parquet from the perspective of table schema +processing. + +1. Hive is case insensitive, while Parquet is not +1. Hive considers all columns nullable, while nullability in Parquet is significant + +Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a +Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are: + +1. Fields that have the same name in both schema must have the same data type regardless of + nullability. The reconciled field should have the data type of the Parquet side, so that + nullability is respected. + +1. The reconciled schema contains exactly those fields defined in Hive metastore schema. + + - Any fields that only appear in the Parquet schema are dropped in the reconciled schema. + - Any fields that only appear in the Hive metastore schema are added as nullable field in the + reconciled schema. + +#### Metadata Refreshing + +Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table +conversion is enabled, metadata of those converted tables are also cached. If these tables are +updated by Hive or other external tools, you need to refresh them manually to ensure consistent +metadata. + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +// spark is an existing SparkSession +spark.catalog.refreshTable("my_table") +{% endhighlight %} + +</div> + +<div data-lang="java" markdown="1"> + +{% highlight java %} +// spark is an existing SparkSession +spark.catalog().refreshTable("my_table"); +{% endhighlight %} + +</div> + +<div data-lang="python" markdown="1"> + +{% highlight python %} +# spark is an existing SparkSession +spark.catalog.refreshTable("my_table") +{% endhighlight %} + +</div> + +<div data-lang="r" markdown="1"> + +{% highlight r %} +refreshTable("my_table") +{% endhighlight %} + +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} +REFRESH TABLE my_table; +{% endhighlight %} + +</div> + +</div> + +### Configuration + +Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running +`SET key=value` commands using SQL. + +<table class="table"> +<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> +<tr> + <td><code>spark.sql.parquet.binaryAsString</code></td> + <td>false</td> + <td> + Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do + not differentiate between binary data and strings when writing out the Parquet schema. This + flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. + </td> +</tr> +<tr> + <td><code>spark.sql.parquet.int96AsTimestamp</code></td> + <td>true</td> + <td> + Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This + flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. + </td> +</tr> +<tr> + <td><code>spark.sql.parquet.compression.codec</code></td> + <td>snappy</td> + <td> + Sets the compression codec used when writing Parquet files. If either `compression` or + `parquet.compression` is specified in the table-specific options/properties, the precedence would be + `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: + none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd. + Note that `zstd` requires `ZStandardCodec` to be installed before Hadoop 2.9.0, `brotli` requires + `BrotliCodec` to be installed. + </td> +</tr> +<tr> + <td><code>spark.sql.parquet.filterPushdown</code></td> + <td>true</td> + <td>Enables Parquet filter push-down optimization when set to true.</td> +</tr> +<tr> + <td><code>spark.sql.hive.convertMetastoreParquet</code></td> + <td>true</td> + <td> + When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in + support. + </td> +</tr> +<tr> + <td><code>spark.sql.parquet.mergeSchema</code></td> + <td>false</td> + <td> + <p> + When true, the Parquet data source merges schemas collected from all data files, otherwise the + schema is picked from the summary file or a random data file if no summary file is available. + </p> + </td> +</tr> +<tr> + <td><code>spark.sql.optimizer.metadataOnly</code></td> + <td>true</td> + <td> + <p> + When true, enable the metadata-only query optimization that use the table's metadata to + produce the partition columns instead of table scans. It applies when all the columns scanned + are partition columns and the query has an aggregate operator that satisfies distinct + semantics. + </p> + </td> +</tr> +<tr> + <td><code>spark.sql.parquet.writeLegacyFormat</code></td> + <td>false</td> + <td> + If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values + will be written in Apache Parquet's fixed-length byte array format, which other systems such as + Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For + example, decimals will be written in int-based format. If Parquet output is intended for use + with systems that do not support this newer format, set to true. + </td> +</tr> +</table> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org