This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 840b07e [HUDI-589][DOCS] Fix querying_data page (#1333)
840b07e is described below
commit 840b07ee4452e1f0654a32cb32cd7bed3279edcf
Author: Bhavani Sudha Saktheeswaran <[email protected]>
AuthorDate: Mon Mar 2 11:09:41 2020 -0800
[HUDI-589][DOCS] Fix querying_data page (#1333)
- Added support matrix for COW and MOR tables
- Change reference from (`views`|`pulls`) to `queries`
- And minor restructuring
---
docs/_docs/2_3_querying_data.md | 131 ++++++++++++++++++++++------------------
1 file changed, 73 insertions(+), 58 deletions(-)
diff --git a/docs/_docs/2_3_querying_data.md b/docs/_docs/2_3_querying_data.md
index 0ee5e17..1a2ae08 100644
--- a/docs/_docs/2_3_querying_data.md
+++ b/docs/_docs/2_3_querying_data.md
@@ -9,7 +9,7 @@ last_modified_at: 2019-12-30T15:59:57-04:00
Conceptually, Hudi stores data physically once on DFS, while providing 3
different ways of querying, as explained
[before](/docs/concepts.html#query-types).
Once the table is synced to the Hive metastore, it provides external Hive
tables backed by Hudi's custom inputformats. Once the proper hudi
-bundle has been provided, the table can be queried by popular query engines
like Hive, Spark and Presto.
+bundle has been provided, the table can be queried by popular query engines
like Hive, Spark SQL, Spark datasource and Presto.
Specifically, following Hive tables are registered based off [table
name](/docs/configurations.html#TABLE_NAME_OPT_KEY)
and [table type](/docs/configurations.html#TABLE_TYPE_OPT_KEY) passed during
write.
@@ -24,31 +24,49 @@ If `table name = hudi_trips` and `table type =
MERGE_ON_READ`, then we get:
As discussed in the concepts section, the one key primitive needed for
[incrementally
processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop),
-is `incremental pulls` (to obtain a change stream/log from a table). Hudi
tables can be pulled incrementally, which means you can get ALL and ONLY the
updated & new rows
-since a specified instant time. This, together with upserts, are particularly
useful for building data pipelines where 1 or more source Hudi tables are
incrementally pulled (streams/facts),
-joined with other tables (tables/dimensions), to [write out
deltas](/docs/writing_data.html) to a target Hudi table. Incremental view is
realized by querying one of the tables above,
+is obtaining a change stream/log from a table. Hudi tables can be queried
incrementally, which means you can get ALL and ONLY the updated & new rows
+since a specified instant time. This, together with upserts, is particularly
useful for building data pipelines where 1 or more source Hudi tables are
incrementally queried (streams/facts),
+joined with other tables (tables/dimensions), to [write out
deltas](/docs/writing_data.html) to a target Hudi table. Incremental queries
are realized by querying one of the tables above,
with special configurations that indicates to query planning that only
incremental data needs to be fetched out of the table.
-In sections, below we will discuss how to access these query types from
different query engines.
+
+## SUPPORT MATRIX
+
+### COPY_ON_WRITE tables
+
+||Snapshot|Incremental|Read Optimized|
+||--------|-----------|--------------|
+|**Hive**|Y|Y|N/A|
+|**Spark SQL**|Y|Y|N/A|
+|**Spark datasource**|Y|Y|N/A|
+|**Presto**|Y|N|N/A|
+
+### MERGE_ON_READ tables
+
+||Snapshot|Incremental|Read Optimized|
+||--------|-----------|--------------|
+|**Hive**|Y|Y|Y|
+|**Spark SQL**|Y|Y|Y|
+|**Spark datasource**|N|N|Y|
+|**Presto**|N|N|Y|
+
+
+In sections, below we will discuss specific setup to access different query
types from different query engines.
## Hive
-In order for Hive to recognize Hudi tables and query correctly, the
HiveServer2 needs to be provided with the
`hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar`
-in its [aux jars
path](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr).
This will ensure the input format
+In order for Hive to recognize Hudi tables and query correctly,
+ - the HiveServer2 needs to be provided with the
`hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar` in its [aux jars
path](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr).
This will ensure the input format
classes with its dependencies are available for query planning & execution.
+ - For MERGE_ON_READ tables, additionally the bundle needs to be put on the
hadoop/hive installation across the cluster, so that queries can pick up the
custom RecordReader as well.
-### Read optimized query
-In addition to setup above, for beeline cli access, the `hive.input.format`
variable needs to be set to the fully qualified path name of the
+In addition to setup above, for beeline cli access, the `hive.input.format`
variable needs to be set to the fully qualified path name of the
inputformat `org.apache.hudi.hadoop.HoodieParquetInputFormat`. For Tez,
additionally the `hive.tez.input.format` needs to be set
-to `org.apache.hadoop.hive.ql.io.HiveInputFormat`
-
-### Snapshot query
-In addition to installing the hive bundle jar on the HiveServer2, it needs to
be put on the hadoop/hive installation across the cluster, so that
-queries can pick up the custom RecordReader as well.
+to `org.apache.hadoop.hive.ql.io.HiveInputFormat`. Then proceed to query the
table like any other Hive table.
### Incremental query
`HiveIncrementalPuller` allows incrementally extracting changes from large
fact/dimension tables via HiveQL, combining the benefits of Hive (reliably
process complex SQL queries) and
-incremental primitives (speed up query by pulling tables incrementally instead
of scanning fully). The tool uses Hive JDBC to run the hive query and saves its
results in a temp table.
+incremental primitives (speed up querying tables incrementally instead of
scanning fully). The tool uses Hive JDBC to run the hive query and saves its
results in a temp table.
that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the
state it needs from the directory structure to know what should be the commit
time on the target table.
e.g:
`/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}`.The
Delta Hive table registered will be of the form
`{tmpdb}.{source_table}_{last_commit_included}`.
@@ -63,16 +81,16 @@ The following are the configuration options for
HiveIncrementalPuller
|tmp| Directory where the temporary delta data is stored in DFS. The directory
structure will follow conventions. Please see the below section. | |
|extractSQLFile| The SQL to execute on the source table to extract the data.
The data extracted will be all the rows that changed since a particular point
in time. | |
|sourceTable| Source Table Name. Needed to set hive environment properties. |
|
+|sourceDb| Source DB name. Needed to set hive environment properties.| |
|targetTable| Target Table Name. Needed for the intermediate storage directory
structure. | |
-|sourceDataPath| Source DFS Base Path. This is where the Hudi metadata will be
read. | |
-|targetDataPath| Target DFS Base path. This is needed to compute the
fromCommitTime. This is not needed if fromCommitTime is specified explicitly. |
|
+|targetDb| Target table's DB name.| |
|tmpdb| The database to which the intermediate temp delta table will be
created | hoodie_temp |
-|fromCommitTime| This is the most important parameter. This is the point in
time from which the changed records are pulled from. | |
-|maxCommits| Number of commits to include in the pull. Setting this to -1 will
include all the commits from fromCommitTime. Setting this to a value > 0, will
include records that ONLY changed in the specified number of commits after
fromCommitTime. This may be needed if you need to catch up say 2 commits at a
time. | 3 |
+|fromCommitTime| This is the most important parameter. This is the point in
time from which the changed records are queried from. | |
+|maxCommits| Number of commits to include in the query. Setting this to -1
will include all the commits from fromCommitTime. Setting this to a value > 0,
will include records that ONLY changed in the specified number of commits after
fromCommitTime. This may be needed if you need to catch up say 2 commits at a
time. | 3 |
|help| Utility Help | |
-Setting fromCommitTime=0 and maxCommits=-1 will pull in the entire source
table and can be used to initiate backfills. If the target table is a Hudi
table,
+Setting fromCommitTime=0 and maxCommits=-1 will fetch the entire source table
and can be used to initiate backfills. If the target table is a Hudi table,
then the utility can determine if the target table has no commits or is behind
more than 24 hour (this is configurable),
it will automatically use the backfill configuration, since applying the last
24 hours incrementally could take more time than doing a backfill. The current
limitation of the tool
is the lack of support for self-joining the same table in mixed mode (snapshot
and incremental modes).
@@ -84,55 +102,45 @@ using the hive session property for incremental queries:
`set hive.fetch.task.co
would ensure Map Reduce execution is chosen for a Hive query, which combines
partitions (comma
separated) and calls InputFormat.listStatus() only once with all those
partitions.
-## Spark
-
-Spark provides much easier deployment & management of Hudi jars and bundles
into jobs/notebooks. At a high level, there are two ways to access Hudi tables
in Spark.
-
- - **Hudi DataSource** : Supports Read Optimized, Incremental Pulls similar to
how standard datasources (e.g: `spark.read.parquet`) work.
- - **Read as Hive tables** : Supports all three query types, including the
snapshot queries, relying on the custom Hudi input formats again like Hive.
+## Spark SQL
+Supports all query types across both Hudi table types, relying on the custom
Hudi input formats again like Hive.
+Typically notebook users and spark-shell users leverage spark sql for querying
Hudi tables. Please add hudi-spark-bundle
+as described above via --jars or --packages.
- In general, your spark job needs a dependency to `hudi-spark` or
`hudi-spark-bundle_2.*-x.y.z.jar` needs to be on the class path of driver &
executors (hint: use `--jars` argument)
-
-### Read optimized query
-
-Pushing a path filter into sparkContext as follows allows for read optimized
querying of a Hudi hive table using SparkSQL.
-This method retains Spark built-in optimizations for reading Parquet files
like vectorized reading on Hudi tables.
-
-```scala
-spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter]);
-```
-
-If you prefer to glob paths on DFS via the datasource, you can simply do
something like below to get a Spark dataframe to work with.
+### Snapshot query {#spark-snapshot-query}
+By default, Spark SQL will try to use its own parquet reader instead of Hive
SerDe when reading from Hive metastore parquet tables.
+However, for MERGE_ON_READ tables which has both parquet and avro data, this
default setting needs to be turned off using set
`spark.sql.hive.convertMetastoreParquet=false`.
+This will force Spark to fallback to using the Hive Serde to read the data
(planning/executions is still Spark).
```java
-Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi")
-// pass any path glob, can include hudi & non-hudi tables
-.load("/glob/path/pattern");
+$ spark-shell --driver-class-path /etc/hive/conf --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
--conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10
--driver-memory 7g --executor-memory 2g --master yarn-client
+
+scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr =
'2016-10-02'").show()
+scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr =
'2016-10-02'").show()
```
-
-### Snapshot query {#spark-snapshot-query}
-Currently, near-real time data can only be queried as a Hive table in Spark
using snapshot query mode. In order to do this, set
`spark.sql.hive.convertMetastoreParquet=false`, forcing Spark to fallback
-to using the Hive Serde to read the data (planning/executions is still Spark).
-```java
-$ spark-shell --jars hudi-spark-bundle_2.11-x.y.z-SNAPSHOT.jar
--driver-class-path /etc/hive/conf --packages
org.apache.spark:spark-avro_2.11:2.4.4 --conf
spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory
7g --executor-memory 2g --master yarn-client
+For COPY_ON_WRITE tables, either Hive SerDe can be used by turning off
`spark.sql.hive.convertMetastoreParquet=false` as described above or Spark's
built in support can be leveraged.
+If using spark's built in support, additionally a path filter needs to be
pushed into sparkContext as follows. This method retains Spark built-in
optimizations for reading parquet files like vectorized reading on Hudi Hive
tables.
-scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr =
'2016-10-02'").show()
-scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr =
'2016-10-02'").show()
+```scala
+spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter]);
```
-### Incremental pulling {#spark-incr-pull}
-The `hudi-spark` module offers the DataSource API, a more elegant way to pull
data from Hudi table and process it via Spark.
-A sample incremental pull, that will obtain all records written since
`beginInstantTime`, looks like below.
+### Incremental querying {#spark-incr-query}
+Incremental queries work like hive incremental queries. The `hudi-spark`
module offers the DataSource API, a more elegant way to query data from Hudi
table and process it via Spark.
+A sample incremental query, that will obtain all records written since
`beginInstantTime`, looks like below.
```java
- Dataset<Row> hoodieIncViewDF = spark.read()
+ Dataset<Row> hudiIncQueryDF = spark.read()
.format("org.apache.hudi")
- .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(),
- DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
+ DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
<beginInstantTime>)
- .load(tablePath); // For incremental view, pass in the root/base path of
table
+ .load(tablePath); // For incremental query, pass in the root/base path of
table
+
+hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
```
Please refer to [configurations](/docs/configurations.html#spark-datasource)
section, to view all datasource options.
@@ -145,11 +153,18 @@ Additionally, `HoodieReadClient` offers the following
functionality using Hudi's
| filterExists() | Filter out already existing records from the provided
RDD[HoodieRecord]. Useful for de-duplication |
| checkExists(keys) | Check if the provided keys exist in a Hudi table |
+## Spark datasource
+
+Hudi COPY_ON_WRITE tables can be queried via Spark datasource similar to how
standard datasources work (e.g: `spark.read.parquet`).
+Both snapshot querying and incremental querying are supported here. Typically
spark jobs require adding `--jars <path to
jar>/hudi-spark-bundle_2.11:0.5.1-incubating`
+to classpath of drivers and executors. When using spark shell instead of
`--jars`, `--packages` can also be used to fetch the hudi-spark-bundle like
this: `--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating`
+For examples, refer to [Setup spark-shell in
quickstart](/docs/quick-start-guide.html#setup-spark-shell).
## Presto
-Presto is a popular query engine, providing interactive query performance.
Presto currently supports only read optimized queries on Hudi tables.
-This requires the `hudi-presto-bundle` jar to be placed into
`<presto_install>/plugin/hive-hadoop2/`, across the installation.
+Presto is a popular query engine, providing interactive query performance.
Presto currently supports snapshot queries on
+COPY_ON_WRITE and read optimized queries on MERGE_ON_READ Hudi tables. This
requires the `hudi-presto-bundle` jar
+to be placed into `<presto_install>/plugin/hive-hadoop2/`, across the
installation.
## Impala(Not Officially Released)