This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 48c72c30f35e docs: replace the notion of precombine field with
ordering field (#14322)
48c72c30f35e is described below
commit 48c72c30f35e7b507d63345bfbedebb259f8b3f5
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Nov 23 22:58:53 2025 -0600
docs: replace the notion of precombine field with ordering field (#14322)
Replace the deprecated notion of precombine field with ordering field.
- `hoodie.datasource.write.precombine.field` ->
`hoodie.table.ordering.fields`
- `write.precombine.field` -> `ordering.fields`
- any text that mentions precombine field or precombine is updated
accordingly
- sql examples still use `preCombineField` as the option key
- default `ts` is updated to "no default"
---
website/docs/clustering.md | 4 +-
website/docs/compaction.md | 2 +-
website/docs/concurrency_control.md | 2 +-
website/docs/deployment.md | 4 +-
website/docs/disaster_recovery.md | 6 +-
website/docs/flink-quick-start-guide.md | 2 +-
website/docs/gcp_bigquery.md | 2 +-
website/docs/metadata_indexing.md | 2 +-
website/docs/migration_guide.md | 2 +-
website/docs/procedures.md | 2 +-
website/docs/quick-start-guide.md | 10 +--
website/docs/reading_tables_streaming_reads.md | 4 +-
website/docs/schema_evolution.md | 100 ++++++++++++------------
website/docs/sql_ddl.md | 10 +--
website/docs/sql_dml.md | 46 +++++------
website/docs/sql_queries.md | 2 +-
website/docs/syncing_metastore.md | 26 +++---
website/docs/write_operations.md | 34 ++++----
website/docs/writing_data.md | 25 +++---
website/docs/writing_tables_streaming_writes.md | 4 +-
20 files changed, 145 insertions(+), 144 deletions(-)
diff --git a/website/docs/clustering.md b/website/docs/clustering.md
index e1894e19c239..66f1bfc0e544 100644
--- a/website/docs/clustering.md
+++ b/website/docs/clustering.md
@@ -193,7 +193,7 @@ import org.apache.hudi.config.HoodieWriteConfig._
val df = //generate data frame
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", "tableName").
@@ -305,7 +305,7 @@ val commonOpts = Map(
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.datasource.write.recordkey.field" -> "_row_key",
"hoodie.datasource.write.partitionpath.field" -> "partition",
- "hoodie.datasource.write.precombine.field" -> "timestamp",
+ "hoodie.table.ordering.fields" -> "timestamp",
"hoodie.table.name" -> "hoodie_test"
)
diff --git a/website/docs/compaction.md b/website/docs/compaction.md
index 7acec30db5fe..fb3212df1df0 100644
--- a/website/docs/compaction.md
+++ b/website/docs/compaction.md
@@ -154,7 +154,7 @@ import org.apache.spark.sql.streaming.ProcessingTime;
.option("hoodie.datasource.write.table.type", tableType)
.option("hoodie.datasource.write.recordkey.field", "_row_key")
.option("hoodie.datasource.write.partitionpath.field", "partition")
- .option("hoodie.datasource.write.precombine.field"(), "timestamp")
+ .option("hoodie.table.ordering.fields", "timestamp")
.option("hoodie.compact.inline.max.delta.commits", "10")
.option("hoodie.datasource.compaction.async.enable", "true")
.option("hoodie.table.name", tableName).option("checkpointLocation",
checkpointLocation)
diff --git a/website/docs/concurrency_control.md
b/website/docs/concurrency_control.md
index 511516b5df0e..541ae2d535c0 100644
--- a/website/docs/concurrency_control.md
+++ b/website/docs/concurrency_control.md
@@ -307,7 +307,7 @@ Following is an example of how to use
optimistic_concurrency_control via spark d
```java
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
- .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.table.ordering.fields", "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
diff --git a/website/docs/deployment.md b/website/docs/deployment.md
index 1e0df56da363..8d265e21a395 100644
--- a/website/docs/deployment.md
+++ b/website/docs/deployment.md
@@ -143,7 +143,7 @@ inputDF.write()
.options(clientOpts) // any of the Hudi client opts can be passed in as
well
.option("hoodie.datasource.write.recordkey.field", "_row_key")
.option("hoodie.datasource.write.partitionpath.field", "partition")
- .option("hoodie.datasource.write.precombine.field"(), "timestamp")
+ .option("hoodie.table.ordering.fields", "timestamp")
.option("hoodie.table.name", tableName)
.mode(SaveMode.Append)
.save(basePath);
@@ -228,7 +228,7 @@ val inserts =
convertToStringList(dataGen.generateInserts(100)).toList
val insertDf = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
insertDf.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
diff --git a/website/docs/disaster_recovery.md
b/website/docs/disaster_recovery.md
index 17e0e86b4751..fb96aefdde56 100644
--- a/website/docs/disaster_recovery.md
+++ b/website/docs/disaster_recovery.md
@@ -48,7 +48,7 @@ val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -63,7 +63,7 @@ for (_ <- 1 to 4) {
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -204,7 +204,7 @@ for (_ <- 1 to 3) {
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index f29f2ba1f7b3..4cec86178982 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -194,7 +194,7 @@ String basePath = "file:///tmp/hudi_table";
Map<String, String> options = new HashMap<>();
options.put("path", basePath);
options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
-options.put("precombine.field", "ts");
+options.put("ordering.fields", "ts");
DataStream<RowData> dataStream = env.addSource(...);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
diff --git a/website/docs/gcp_bigquery.md b/website/docs/gcp_bigquery.md
index 51c6fd6c1003..23e020b2419c 100644
--- a/website/docs/gcp_bigquery.md
+++ b/website/docs/gcp_bigquery.md
@@ -94,7 +94,7 @@ spark-submit --master yarn \
--hoodie-conf hoodie.datasource.hive_sync.table=mytable \
--hoodie-conf hoodie.datasource.write.recordkey.field=mykey \
--hoodie-conf hoodie.datasource.write.partitionpath.field=year,month,day \
---hoodie-conf hoodie.datasource.write.precombine.field=ts \
+--hoodie-conf hoodie.table.ordering.fields=ts \
--hoodie-conf hoodie.datasource.write.keygenerator.type=COMPLEX \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.drop.partition.columns=true \
diff --git a/website/docs/metadata_indexing.md
b/website/docs/metadata_indexing.md
index ffacbdf20fe9..86df7c58061c 100644
--- a/website/docs/metadata_indexing.md
+++ b/website/docs/metadata_indexing.md
@@ -138,7 +138,7 @@ from raw parquet to Hudi table. We used the widely
available [NY Taxi dataset](h
```bash
hoodie.datasource.write.recordkey.field=VendorID
hoodie.datasource.write.partitionpath.field=tpep_dropoff_datetime
-hoodie.datasource.write.precombine.field=tpep_dropoff_datetime
+hoodie.table.ordering.fields=tpep_dropoff_datetime
hoodie.streamer.source.dfs.root=/Users/home/path/to/data/parquet_files/
hoodie.streamer.schemaprovider.target.schema.file=/Users/home/path/to/schema/schema.avsc
hoodie.streamer.schemaprovider.source.schema.file=/Users/home/path/to/schema/schema.avsc
diff --git a/website/docs/migration_guide.md b/website/docs/migration_guide.md
index 6986659b1a12..84147bc06fd9 100644
--- a/website/docs/migration_guide.md
+++ b/website/docs/migration_guide.md
@@ -64,7 +64,7 @@ spark-submit --master local \
--hoodie-conf hoodie.bootstrap.base.path=/tmp/source_table \
--hoodie-conf hoodie.datasource.write.recordkey.field=${KEY_FIELD} \
--hoodie-conf hoodie.datasource.write.partitionpath.field=${PARTITION_FIELD} \
---hoodie-conf hoodie.datasource.write.precombine.field=${PRECOMBINE_FILED} \
+--hoodie-conf hoodie.table.ordering.fields=${ORDERING_FIELDS} \
--hoodie-conf
hoodie.bootstrap.keygen.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf
hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector
\
--hoodie-conf hoodie.bootstrap.mode.selector.regex='.*' \
diff --git a/website/docs/procedures.md b/website/docs/procedures.md
index 0fa45c36a32c..62d7766c44b4 100644
--- a/website/docs/procedures.md
+++ b/website/docs/procedures.md
@@ -995,7 +995,7 @@ call show_table_properties(table => 'test_hudi_table',
limit => 10);
| key | value |
|-------------------------------|-------|
-| hoodie.table.precombine.field | ts |
+| hoodie.table.ordering.fields | ts |
| hoodie.table.partition.fields | dt |
### show_fs_path_detail
diff --git a/website/docs/quick-start-guide.md
b/website/docs/quick-start-guide.md
index 58d62a237b73..24ea218319fb 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -1184,15 +1184,15 @@ PARTITIONED BY (city);
:::note Implications of defining record keys
Configuring keys for a Hudi table, has a new implications on the table. If
record key is set by the user, `upsert` is chosen as the [write
operation](write_operations).
-Also if a record key is configured, then it's also advisable to specify a
precombine or ordering field, to correctly handle cases where the source data
has
+Also if a record key is configured, then it's also advisable to specify
ordering fields, to correctly handle cases where the source data has
multiple records with the same key. See section below.
:::
## Merge Modes
-Hudi also allows users to specify a _precombine_ field, which will be used to
order and resolve conflicts between multiple versions of the same record. This
is very important for
+Hudi also allows users to specify ordering fields, which will be used to order
and resolve conflicts between multiple versions of the same record. This is
very important for
use-cases like applying database CDC logs to a Hudi table, where a given
record may appear multiple times in the source data due to repeated upstream
updates.
Hudi also uses this mechanism to support out-of-order data arrival into a
table, where records may need to be resolved in a different order than their
commit time.
-For e.g. using a _created_at_ timestamp field as the precombine field will
prevent older versions of a record from overwriting newer ones or being exposed
to queries, even
+For e.g. using a _created_at_ timestamp field as an ordering field will
prevent older versions of a record from overwriting newer ones or being exposed
to queries, even
if they are written at a later commit time to the table. This is one of the
key features, that makes Hudi, best suited for dealing with streaming data.
To enable different merge semantics, Hudi supports [merge
modes](record_merger). Commit time and event time based merge modes are
supported out of the box.
@@ -1214,7 +1214,7 @@ values={[
// spark-shell
updatesDf.write.format("hudi").
...
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
...
```
@@ -1226,7 +1226,7 @@ updatesDf.write.format("hudi").
# pyspark
hudi_options = {
...
-'hoodie.datasource.write.precombine.field': 'ts'
+'hoodie.table.ordering.fields': 'ts'
}
upsert.write.format("hudi").
diff --git a/website/docs/reading_tables_streaming_reads.md
b/website/docs/reading_tables_streaming_reads.md
index 5e73524e14d4..5055f42c0449 100644
--- a/website/docs/reading_tables_streaming_reads.md
+++ b/website/docs/reading_tables_streaming_reads.md
@@ -26,7 +26,7 @@ values={[
// reload data
df.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -64,7 +64,7 @@ hudi_options = {
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.table.ordering.fields': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
diff --git a/website/docs/schema_evolution.md b/website/docs/schema_evolution.md
index 1638d6ad1c6f..91465606cad6 100755
--- a/website/docs/schema_evolution.md
+++ b/website/docs/schema_evolution.md
@@ -49,10 +49,10 @@ This chart shows what the table schema will be when an
incoming column type has
| Incoming Schema ↓ \ Table Schema → | int | long | float |
double | string | bytes |
|------------------------------------------------|--------|--------|--------|--------|--------|-------|
| int | int | long | float |
double | string | X |
-| long | long | long | float |
double | string | X |
-| float | float | float | float |
double | string | X |
-| double | double | double | double |
double | string | X |
-| string | string | string | string |
string | string | bytes |
+| long | long | long | float |
double | string | X |
+| float | float | float | float |
double | string | X |
+| double | double | double | double |
double | string | X |
+| string | string | string | string |
string | string | bytes |
| bytes | X | X | X |
X | string | bytes |
## Schema Evolution on read
@@ -82,12 +82,12 @@ ALTER TABLE tableName ADD COLUMNS(col_spec[, col_spec ...])
Column specification consists of five field, next to each other.
-| Parameter | Description
|
-|:-------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| col_name | name of the new column. To add sub-column col1 to a nested
map type column member map\<string, struct\<n: string, a: int>>, set this field
to member.value.col1
|
-| col_type | type of the new column.
|
-| nullable | whether or not the new column allows null values. (optional)
|
-| comment | comment of the new column. (optional)
|
+| Parameter | Description
|
+|:-------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| col_name | name of the new column. To add sub-column col1 to a nested
map type column member map\<string, struct\<n: string, a: int>>, set this field
to member.value.col1
|
+| col_type | type of the new column.
|
+| nullable | whether or not the new column allows null values. (optional)
|
+| comment | comment of the new column. (optional)
|
| col_position | The position where the new column is added. The value can be
*FIRST* or *AFTER origin_col*. If it is set to *FIRST*, the new column will be
added before the first column of the table. If it is set to *AFTER origin_col*,
the new column will be added after the original column. *FIRST* can be used
only when new sub-columns are added to nested columns and not in top-level
columns. There are no restrictions on the usage of *AFTER*. |
**Examples**
@@ -107,13 +107,13 @@ ALTER TABLE tableName ALTER [COLUMN] col_old_name TYPE
column_type [COMMENT] col
**Parameter Description**
-| Parameter | Description
|
-|:-----------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
-| tableName | Table name.
|
-| col_old_name | Name of the column to be altered.
|
-| column_type | Type of the target column.
|
-| col_comment | Optional comments on the altered column.
|
-| column_name | The new position to place the altered column. For
example, *AFTER* **column_name** indicates that the target column is placed
after **column_name**. |
+| Parameter | Description
|
+|:-------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| tableName | Table name.
|
+| col_old_name | Name of the column to be altered.
|
+| column_type | Type of the target column.
|
+| col_comment | Optional comments on the altered column.
|
+| column_name | The new position to place the altered column. For example,
*AFTER* **column_name** indicates that the target column is placed after
**column_name**. |
**Examples**
@@ -131,15 +131,15 @@ ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
**column type change**
-| Source\Target | long | float | double | string | decimal | date | int |
-|--------------------|-------|-------|--------|--------|---------|------|-----|
-| int | Y | Y | Y | Y | Y | N | Y |
-| long | Y | Y | Y | Y | Y | N | N |
-| float | N | Y | Y | Y | Y | N | N |
-| double | N | N | Y | Y | Y | N | N |
-| decimal | N | N | N | Y | Y | N | N |
-| string | N | N | N | Y | Y | Y | N |
-| date | N | N | N | Y | N | Y | N |
+| Source\Target | long | float | double | string | decimal | date | int |
+|---------------|------|-------|--------|--------|---------|------|-----|
+| int | Y | Y | Y | Y | Y | N | Y |
+| long | Y | Y | Y | Y | Y | N | N |
+| float | N | Y | Y | Y | Y | N | N |
+| double | N | N | Y | Y | Y | N | N |
+| decimal | N | N | N | Y | Y | N | N |
+| string | N | N | N | Y | Y | Y | N |
+| date | N | N | N | Y | N | Y | N |
### Deleting Columns
**Syntax**
@@ -193,7 +193,7 @@ val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
- StructField("preComb", LongType,true),
+ StructField("orderingField", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToLong", IntegerType,true)
@@ -207,7 +207,7 @@ val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
var dfFromData1 = spark.createDataFrame(data1, schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "preComb").
+ option("hoodie.table.ordering.fields", "orderingField").
option("hoodie.datasource.write.recordkey.field", "rowId").
option("hoodie.datasource.write.partitionpath.field", "partitionId").
option("hoodie.index.type","SIMPLE").
@@ -231,20 +231,20 @@ scala> spark.sql("desc hudi_trips_snapshot").show()
| _hoodie_file_name| string| null|
| rowId| string| null|
| partitionId| string| null|
- | preComb| bigint| null|
+ | orderingField| bigint| null|
| name| string| null|
| versionId| string| null|
| intToLong| int| null|
+--------------------+---------+-------+
-scala> spark.sql("select rowId, partitionId, preComb, name, versionId,
intToLong from hudi_trips_snapshot").show()
- +-----+-----------+-------+----+---------+---------+
- |rowId|partitionId|preComb|name|versionId|intToLong|
- +-----+-----------+-------+----+---------+---------+
- |row_3| part_0| 0| tom| v_0| 0|
- |row_2| part_0| 0|john| v_0| 0|
- |row_1| part_0| 0| bob| v_0| 0|
- +-----+-----------+-------+----+---------+---------+
+scala> spark.sql("select rowId, partitionId, orderingField, name, versionId,
intToLong from hudi_trips_snapshot").show()
+ +-----+-----------+-------------+----+---------+---------+
+ |rowId|partitionId|orderingField|name|versionId|intToLong|
+ +-----+-----------+-------------+----+---------+---------+
+ |row_3| part_0| 0| tom| v_0| 0|
+ |row_2| part_0| 0|john| v_0| 0|
+ |row_1| part_0| 0| bob| v_0| 0|
+ +-----+-----------+-------------+----+---------+---------+
// In the new schema, we are going to add a String field and
// change the datatype `intToLong` field from int to long.
@@ -252,7 +252,7 @@ scala> :paste
val newSchema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
- StructField("preComb", LongType,true),
+ StructField("orderingField", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToLong", LongType,true),
@@ -266,7 +266,7 @@ val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3",
3L, "newField_1"),
var dfFromData2 = spark.createDataFrame(data2, newSchema)
dfFromData2.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "preComb").
+ option("hoodie.table.ordering.fields", "orderingField").
option("hoodie.datasource.write.recordkey.field", "rowId").
option("hoodie.datasource.write.partitionpath.field", "partitionId").
option("hoodie.index.type","SIMPLE").
@@ -290,7 +290,7 @@ scala> spark.sql("desc hudi_trips_snapshot").show()
| _hoodie_file_name| string| null|
| rowId| string| null|
| partitionId| string| null|
- | preComb| bigint| null|
+ | orderingField| bigint| null|
| name| string| null|
| versionId| string| null|
| intToLong| bigint| null|
@@ -298,16 +298,16 @@ scala> spark.sql("desc hudi_trips_snapshot").show()
+--------------------+---------+-------+
-scala> spark.sql("select rowId, partitionId, preComb, name, versionId,
intToLong, newField from hudi_trips_snapshot").show()
- +-----+-----------+-------+-------+---------+---------+----------+
- |rowId|partitionId|preComb| name|versionId|intToLong| newField|
- +-----+-----------+-------+-------+---------+---------+----------+
- |row_3| part_0| 0| tom| v_0| 0| null|
- |row_2| part_0| 5| john| v_3| 3|newField_1|
- |row_1| part_0| 0| bob| v_0| 0| null|
- |row_5| part_0| 5| maroon| v_2| 2|newField_1|
- |row_9| part_0| 5|michael| v_2| 2|newField_1|
- +-----+-----------+-------+-------+---------+---------+----------+
+scala> spark.sql("select rowId, partitionId, orderingField, name, versionId,
intToLong, newField from hudi_trips_snapshot").show()
+ +-----+-----------+-------------+-------+---------+---------+----------+
+ |rowId|partitionId|orderingField| name|versionId|intToLong| newField|
+ +-----+-----------+-------------+-------+---------+---------+----------+
+ |row_3| part_0| 0| tom| v_0| 0| null|
+ |row_2| part_0| 5| john| v_3| 3|newField_1|
+ |row_1| part_0| 0| bob| v_0| 0| null|
+ |row_5| part_0| 5| maroon| v_2| 2|newField_1|
+ |row_9| part_0| 5|michael| v_2| 2|newField_1|
+ +-----+-----------+-------------+-------+---------+---------+----------+
```
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index d1a08846ecc0..e7127641e6de 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -77,7 +77,7 @@ should be specified as `PARTITIONED BY (dt, hh)`.
As discussed [here](quick-start-guide#keys), tables track each record in the
table using a record key. Hudi auto-generated a highly compressed
key for each new record in the examples so far. If you want to use an existing
field as the key, you can set the `primaryKey` option.
-Typically, this is also accompanied by configuring a `preCombineField` option
to deal with out-of-order data and potential
+Typically, this is also accompanied by configuring ordering fields (via
`preCombineField` option) to deal with out-of-order data and potential
duplicate records with the same key in the incoming writes.
:::note
@@ -86,7 +86,7 @@ this materializes a composite key of the two fields, which
can be useful for exp
:::
Here is an example of creating a table using both options. Typically, a field
that denotes the time of the event or
-fact, e.g., order creation time, event generation time etc., is used as the
_preCombineField_. Hudi resolves multiple versions
+fact, e.g., order creation time, event generation time etc., is used as the
ordering field (via `preCombineField`). Hudi resolves multiple versions
of the same record by ordering based on this field when queries are run on the
table.
```sql
@@ -124,7 +124,7 @@ TBLPROPERTIES (
LOCATION 'file:///tmp/hudi_table_merge_mode/';
```
-With `EVENT_TIME_ORDERING`, the record with the larger event time
(`precombineField`) overwrites the record with the
+With `EVENT_TIME_ORDERING`, the record with the larger event time (specified
via `precombineField` ordering field) overwrites the record with the
smaller event time on the same key, regardless of transaction's commit time.
Users can set `CUSTOM` mode to provide their own
merge logic. With `CUSTOM` merge mode, you can provide a custom class that
implements the merge logic. The interfaces
to implement is explained in detail [here](record_merger#custom).
@@ -579,7 +579,7 @@ Users can set table properties while creating a table. The
important table prope
|------------------|--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| type | cow | The table type to create. `type = 'cow'` creates a
COPY-ON-WRITE table, while `type = 'mor'` creates a MERGE-ON-READ table. Same
as `hoodie.datasource.write.table.type`. More details can be found
[here](table_types)
|
| primaryKey | uuid | The primary key field names of the table separated by
commas. Same as `hoodie.datasource.write.recordkey.field`. If this config is
ignored, hudi will auto-generate primary keys. If explicitly set, primary key
generation will honor user configuration. |
-| preCombineField | | The pre-combine field of the table. It is used for
resolving the final version of the record among multiple versions. Generally,
`event time` or another similar column will be used for ordering purposes. Hudi
will be able to handle out-of-order data using the preCombine field value. |
+| preCombineField | | The ordering field(s) of the table. It is used for
resolving the final version of the record among multiple versions. Generally,
`event time` or another similar column will be used for ordering purposes. Hudi
will be able to handle out-of-order data using the ordering field value. |
:::note
`primaryKey`, `preCombineField`, and `type` and other properties are
case-sensitive.
@@ -936,7 +936,7 @@ WITH (
'connector' = 'hudi',
'path' = 'file:///tmp/hudi_table',
'table.type' = 'MERGE_ON_READ',
-'precombine.field' = 'ts',
+'ordering.fields' = 'ts',
'hoodie.cleaner.fileversions.retained' = '20',
'hoodie.keep.max.commits' = '20',
'hoodie.datasource.write.hive_style_partitioning' = 'true'
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index e651c1a47a9d..4b4f94266b40 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -51,7 +51,7 @@ INSERT INTO hudi_cow_pt_tbl PARTITION(dt, hh) SELECT 1 AS id,
'a1' AS name, 1000
:::note Mapping to write operations
Hudi offers flexibility in choosing the underlying [write
operation](write_operations) of a `INSERT INTO` statement using
the `hoodie.spark.sql.insert.into.operation` configuration. Possible options
include *"bulk_insert"* (large inserts), *"insert"* (with small file
management),
-and *"upsert"* (with deduplication/merging). If a precombine field is not set,
*"insert"* is chosen as the default. For a table with preCombine field set,
+and *"upsert"* (with deduplication/merging). If ordering fields are not set,
*"insert"* is chosen as the default. For a table with ordering fields set (via
`preCombineField`),
*"upsert"* is chosen as the default operation.
:::
@@ -101,7 +101,7 @@ update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';
```
:::info
-The `UPDATE` operation requires the specification of a `preCombineField`.
+The `UPDATE` operation requires the specification of ordering fields (via
`preCombineField`).
:::
### Merge Into
@@ -138,9 +138,9 @@ For a Hudi table with user configured primary keys, the
join condition and the `
For a table where Hudi auto generates primary keys, the join condition in
`MERGE INTO` can be on any arbitrary data columns.
-if the `hoodie.record.merge.mode` is set to `EVENT_TIME_ORDERING`, the
`preCombineField` is required to be set with value in the `UPDATE`/`INSERT`
clause.
+if the `hoodie.record.merge.mode` is set to `EVENT_TIME_ORDERING`, ordering
fields (via `preCombineField`) are required to be set with value in the
`UPDATE`/`INSERT` clause.
-It is enforced that if the target table has primary key and partition key
column, the source table counterparts must enforce the same data type
accordingly. Plus, if the target table is configured with
`hoodie.record.merge.mode` = `EVENT_TIME_ORDERING` where target table is
expected to have a valid precombine field configuration, the source table
counterpart must also have the same data type.
+It is enforced that if the target table has primary key and partition key
column, the source table counterparts must enforce the same data type
accordingly. Plus, if the target table is configured with
`hoodie.record.merge.mode` = `EVENT_TIME_ORDERING` where target table is
expected to have valid ordering fields configuration, the source table
counterpart must also have the same data type.
:::
Examples below
@@ -374,28 +374,28 @@ INSERT INTO hudi_table/*+
OPTIONS('hoodie.keep.max.commits'='true')*/
The hudi-flink module defines the Flink SQL connector for both hudi source and
sink.
There are a number of options available for the sink table:
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| path | Y | N/A | Base path for the target hoodie table. The path would be
created if it does not exist, otherwise a hudi table expects to be initialized
successfully |
-| table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ |
-| write.operation | N | upsert | The write operation, that this write should
do (insert or upsert is supported) |
-| write.precombine.field | N | ts | Field used in preCombining before actual
write. When two records have the same key value, we will pick the one with the
largest value for the precombine field, determined by Object.compareTo(..) |
-| write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload
class used. Override this, if you like to roll your own merge logic, when
upserting/inserting. This will render any value set for the option in-effective
|
-| write.insert.drop.duplicates | N | false | Flag to indicate whether to drop
duplicates upon insert. By default insert will accept duplicates, to gain extra
performance |
-| write.ignore.failed | N | true | Flag to indicate whether to ignore any non
exception error (e.g. writestatus error). within a checkpoint batch. By default
true (in favor of streaming progressing over data integrity) |
-| hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value
to be used as the `recordKey` component of `HoodieKey`. Actual value will be
obtained by invoking .toString() on the field value. Nested fields can be
specified using the dot notation eg: `a.b.c` |
-| hoodie.datasource.write.keygenerator.class | N |
SimpleAvroKeyGenerator.class | Key generator class, that implements will
extract the key out of incoming record |
-| write.tasks | N | 4 | Parallelism of tasks that do actual write, default is
4 |
-| write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into
the underneath filesystem |
+| Option Name | Required | Default
| Remarks
|
+|--------------------------------------------|----------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| path | Y | N/A
| Base path for the target hoodie table. The path would be
created if it does not exist, otherwise a hudi table expects to be initialized
successfully
|
+| table.type | N | COPY_ON_WRITE
| Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ
|
+| write.operation | N | upsert
| The write operation, that this write should do (insert or
upsert is supported)
|
+| write.precombine.field | N | (no default)
| Field used for ordering records before actual write. When two
records have the same key value, we will pick the one with the largest value
for the ordering field, determined by Object.compareTo(..). Note: This config
is deprecated, use `ordering.fields` instead |
+| write.payload.class | N |
OverwriteWithLatestAvroPayload.class | Payload class used. Override this, if
you like to roll your own merge logic, when upserting/inserting. This will
render any value set for the option in-effective
|
+| write.insert.drop.duplicates | N | false
| Flag to indicate whether to drop duplicates upon insert. By
default insert will accept duplicates, to gain extra performance
|
+| write.ignore.failed | N | true
| Flag to indicate whether to ignore any non exception error
(e.g. writestatus error). within a checkpoint batch. By default true (in favor
of streaming progressing over data integrity)
|
+| hoodie.datasource.write.recordkey.field | N | uuid
| Record key field. Value to be used as the `recordKey`
component of `HoodieKey`. Actual value will be obtained by invoking .toString()
on the field value. Nested fields can be specified using the dot notation eg:
`a.b.c` |
+| hoodie.datasource.write.keygenerator.class | N |
SimpleAvroKeyGenerator.class | Key generator class, that implements
will extract the key out of incoming record
|
+| write.tasks | N | 4
| Parallelism of tasks that do actual write, default is 4
|
+| write.batch.size.MB | N | 128
| Batch buffer size in MB to flush data into the underneath
filesystem
|
If the table type is MERGE_ON_READ, you can also specify the asynchronous
compaction strategy through options:
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| compaction.async.enabled | N | true | Async Compaction, enabled by default
for MOR |
-| compaction.trigger.strategy | N | num_commits | Strategy to trigger
compaction, options are 'num_commits': trigger compaction when reach N delta
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
-| compaction.delta_commits | N | 5 | Max delta commits needed to trigger
compaction, default 5 commits |
-| compaction.delta_seconds | N | 3600 | Max delta seconds time needed to
trigger compaction, default 1 hour |
+| Option Name | Required | Default | Remarks
|
+|-----------------------------|----------|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| compaction.async.enabled | N | true | Async Compaction,
enabled by default for MOR
|
+| compaction.trigger.strategy | N | num_commits | Strategy to trigger
compaction, options are 'num_commits': trigger compaction when reach N delta
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
+| compaction.delta_commits | N | 5 | Max delta commits
needed to trigger compaction, default 5 commits
|
+| compaction.delta_seconds | N | 3600 | Max delta seconds
time needed to trigger compaction, default 1 hour
|
You can write the data using the SQL `INSERT INTO` statements:
```sql
diff --git a/website/docs/sql_queries.md b/website/docs/sql_queries.md
index d540816aedaf..aba1d5845c5b 100644
--- a/website/docs/sql_queries.md
+++ b/website/docs/sql_queries.md
@@ -225,7 +225,7 @@ INSERT INTO hudi_table_merge_mode VALUES (1, 'a1', 900,
20.0);
SELECT id, name, ts, price FROM hudi_table_merge_mode;
```
-With `EVENT_TIME_ORDERING`, the record with the larger event time
(`precombineField`) overwrites the record with the
+With `EVENT_TIME_ORDERING`, the record with the larger event time (specified
via `precombineField` ordering field) overwrites the record with the
smaller event time on the same key, regardless of transaction time.
### Snapshot Query with Custom Merge Mode
diff --git a/website/docs/syncing_metastore.md
b/website/docs/syncing_metastore.md
index d8d5583b9ef2..24d652d1781b 100644
--- a/website/docs/syncing_metastore.md
+++ b/website/docs/syncing_metastore.md
@@ -41,7 +41,7 @@ val basePath = "/user/hive/warehouse/hudi_cow"
val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
-StructField("preComb", LongType,true),
+StructField("orderingField", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
@@ -57,7 +57,7 @@ var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "preComb").
+ option("hoodie.table.ordering.fields", "orderingField").
option("hoodie.datasource.write.recordkey.field", "rowId").
option("hoodie.datasource.write.partitionpath.field", "partitionId").
option("hoodie.database.name", databaseName).
@@ -100,7 +100,7 @@ Beeline version 1.2.1.spark2 by Apache Hive
1 row selected (0.531 seconds)
0: jdbc:hive2://hiveserver:10000> select * from hudi_cow limit 1;
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
-| hudi_cow._hoodie_commit_time | hudi_cow._hoodie_commit_seqno |
hudi_cow._hoodie_record_key | hudi_cow._hoodie_partition_path |
hudi_cow._hoodie_file_name | hudi_cow.rowid
| hudi_cow.precomb | hudi_cow.name | hudi_cow.versionid |
hudi_cow.tobedeletedstr | hudi_cow.inttolong | hudi_cow.longtoint |
hudi_cow.partitionid |
+| hudi_cow._hoodie_commit_time | hudi_cow._hoodie_commit_seqno |
hudi_cow._hoodie_record_key | hudi_cow._hoodie_partition_path |
hudi_cow._hoodie_file_name | hudi_cow.rowid
| hudi_cow.orderingfield | hudi_cow.name | hudi_cow.versionid |
hudi_cow.tobedeletedstr | hudi_cow.inttolong | hudi_cow.longtoint |
hudi_cow.partitionid |
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
| 20220120090023631 | 20220120090023631_1_2 | row_1
| partitionId=2021/01/01 |
0bf9b822-928f-4a57-950a-6a5450319c83-0_1-24-314_20220120090023631.parquet |
row_1 | 0 | bob | v_0 |
toBeDel0 | 0 | 1000000 |
2021/01/01 |
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
@@ -156,16 +156,16 @@ Corresponding datasource options for the most commonly
used hive sync configs ar
In the table below **(N/A)** means there is no default value set.
:::
-| HiveSyncConfig | DataSourceWriteOption | Default Value | Description |
-| ----------- | ----------- | ----------- | ----------- |
-| --database | hoodie.datasource.hive_sync.database | default | Name
of the target database in Hive metastore |
-| --table | hoodie.datasource.hive_sync.table | (N/A) | Name of the
target table in Hive. Inferred from the table name in Hudi table config if not
specified. |
-| --user | hoodie.datasource.hive_sync.username | hive | Username for
hive metastore |
-| --pass | hoodie.datasource.hive_sync.password | hive | Password for
hive metastore |
-| --jdbc-url | hoodie.datasource.hive_sync.jdbcurl |
jdbc:hive2://localhost:10000 | Hive server url if using `jdbc` mode to sync
|
-| --sync-mode | hoodie.datasource.hive_sync.mode | (N/A) | Mode to
choose for Hive ops. Valid values are `hms`, `jdbc` and `hiveql`. More details
in the following section. |
-| --partitioned-by | hoodie.datasource.hive_sync.partition_fields | (N/A)
| Comma-separated column names in the table to use for determining hive
partition. |
-| --partition-value-extractor |
hoodie.datasource.hive_sync.partition_extractor_class |
`org.apache.hudi.hive.MultiPartKeysValueExtractor` | Class which implements
`PartitionValueExtractor` to extract the partition values. Inferred
automatically depending on the partition fields specified. |
+| HiveSyncConfig | DataSourceWriteOption
| Default Value | Description
|
+|-----------------------------|-------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
+| --database | hoodie.datasource.hive_sync.database
| default | Name of the
target database in Hive metastore
|
+| --table | hoodie.datasource.hive_sync.table
| (N/A) | Name of the
target table in Hive. Inferred from the table name in Hudi table config if not
specified. |
+| --user | hoodie.datasource.hive_sync.username
| hive | Username for hive
metastore
|
+| --pass | hoodie.datasource.hive_sync.password
| hive | Password for hive
metastore
|
+| --jdbc-url | hoodie.datasource.hive_sync.jdbcurl
| jdbc:hive2://localhost:10000 | Hive server url
if using `jdbc` mode to sync
|
+| --sync-mode | hoodie.datasource.hive_sync.mode
| (N/A) | Mode to choose
for Hive ops. Valid values are `hms`, `jdbc` and `hiveql`. More details in the
following section. |
+| --partitioned-by | hoodie.datasource.hive_sync.partition_fields
| (N/A) | Comma-separated
column names in the table to use for determining hive partition.
|
+| --partition-value-extractor |
hoodie.datasource.hive_sync.partition_extractor_class |
`org.apache.hudi.hive.MultiPartKeysValueExtractor` | Class which implements
`PartitionValueExtractor` to extract the partition values. Inferred
automatically depending on the partition fields specified. |
### Sync modes
diff --git a/website/docs/write_operations.md b/website/docs/write_operations.md
index a3a1311ac8b6..3a51072f2589 100644
--- a/website/docs/write_operations.md
+++ b/website/docs/write_operations.md
@@ -93,27 +93,27 @@ Here are the basic configs relevant to the write operations
types mentioned abov
**Spark based configs:**
-| Config Name | Default |
Description
[...]
-|------------------------------------------------|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| hoodie.datasource.write.operation | upsert (Optional) |
Whether to do upsert, insert or bulk_insert for the write operation. Use
bulk_insert to load new data into a table, and there on use upsert/insert. bulk
insert uses a disk based write path to scale to load large inputs without need
to cache it.<br /><br />`Config Param: OPERATION`
[...]
-| hoodie.datasource.write.precombine.field | ts (Optional) |
Field used in preCombining before actual write. When two records have the same
key value, we will pick the one with the largest value for the precombine
field, determined by Object.compareTo(..)<br /><br />`Config Param:
PRECOMBINE_FIELD`
[...]
-| hoodie.combine.before.insert | false (Optional) | When
inserted records share same key, controls whether they should be first combined
(i.e de-duplicated) before writing to storage.<br /><br />`Config Param:
COMBINE_BEFORE_INSERT`
[...]
-| hoodie.datasource.write.insert.drop.duplicates | false (Optional) | If
set to true, records from the incoming dataframe will not overwrite existing
records with the same key during the write operation. This config is deprecated
as of 0.14.0. Please use hoodie.datasource.insert.dup.policy instead.<br /><br
/>`Config Param: INSERT_DROP_DUPS`
[...]
-| hoodie.bulkinsert.sort.mode | NONE (Optional) |
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode: Modes for sorting
records during bulk insert. <ul><li>`NONE(default)`: No sorting. Fastest and
matches `spark.write.parquet()` in number of files and
overhead.</li><li>`GLOBAL_SORT`: This ensures best file sizes, with lowest
memory overhead at cost of sorting.</li><li>`PARTITION_SORT`: Strikes a balance
by only sorting within a Spark RDD partition, still keep [...]
-| hoodie.bootstrap.base.path | N/A **(Required)** |
**Applicable only when** operation type is `bootstrap`. Base path of the
dataset that needs to be bootstrapped as a Hudi table<br /><br />`Config Param:
BASE_PATH`<br />`Since Version: 0.6.0`
[...]
-| hoodie.bootstrap.mode.selector |
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector
(Optional) | Selects the mode in which each file/partition in the
bootstrapped dataset gets bootstrapped<br />Possible
values:<ul><li>`org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector`:
In this mode, the full record data is not copied into Hudi therefore it avoids
full cost of rewriting the dataset. Instead, 'skeleton' files co [...]
-| hoodie.datasource.write.partitions.to.delete | N/A **(Required)** |
**Applicable only when** operation type is `delete_partition`. Comma separated
list of partitions to delete. Allows use of wildcard *<br /><br />`Config
Param: PARTITIONS_TO_DELETE`
[...]
+| Config Name | Default
| Description
[...]
+|------------------------------------------------|----------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| hoodie.datasource.write.operation | upsert (Optional)
| Whether to do
upsert, insert or bulk_insert for the write operation. Use bulk_insert to load
new data into a table, and there on use upsert/insert. bulk insert uses a disk
based write path to scale to load large inputs without need to cache it.<br
/><br />`Config Param: OPERATION`
[...]
+| hoodie.datasource.write.precombine.field | (no default) (Optional)
| Field used for
ordering records before actual write. When two records have the same key value,
we will pick the one with the largest value for the ordering field, determined
by Object.compareTo(..). Note: This config is deprecated, use
`hoodie.table.ordering.fields` instead.<br /><br />`Config Param:
PRECOMBINE_FIELD` [...]
+| hoodie.combine.before.insert | false (Optional)
| When inserted
records share same key, controls whether they should be first combined (i.e
de-duplicated) before writing to storage.<br /><br />`Config Param:
COMBINE_BEFORE_INSERT`
[...]
+| hoodie.datasource.write.insert.drop.duplicates | false (Optional)
| If set to true,
records from the incoming dataframe will not overwrite existing records with
the same key during the write operation. This config is deprecated as of
0.14.0. Please use hoodie.datasource.insert.dup.policy instead.<br /><br
/>`Config Param: INSERT_DROP_DUPS`
[...]
+| hoodie.bulkinsert.sort.mode | NONE (Optional)
|
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode: Modes for sorting
records during bulk insert. <ul><li>`NONE(default)`: No sorting. Fastest and
matches `spark.write.parquet()` in number of files and
overhead.</li><li>`GLOBAL_SORT`: This ensures best file sizes, with lowest
memory overhead at cost of sorting.</li><li>`PARTITION_SORT`: Strikes [...]
+| hoodie.bootstrap.base.path | N/A **(Required)**
| **Applicable only
when** operation type is `bootstrap`. Base path of the dataset that needs to be
bootstrapped as a Hudi table<br /><br />`Config Param: BASE_PATH`<br />`Since
Version: 0.6.0`
[...]
+| hoodie.bootstrap.mode.selector |
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector
(Optional) | Selects the mode in which each file/partition in the bootstrapped
dataset gets bootstrapped<br />Possible
values:<ul><li>`org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector`:
In this mode, the full record data is not copied into Hudi therefore it avoids
full cost of rewriting the dataset. Instead, 'skeleton' files containing [...]
+| hoodie.datasource.write.partitions.to.delete | N/A **(Required)**
| **Applicable only
when** operation type is `delete_partition`. Comma separated list of partitions
to delete. Allows use of wildcard *<br /><br />`Config Param:
PARTITIONS_TO_DELETE`
[...]
**Flink based configs:**
-| Config Name | Default |
Description
|
-|------------------------------------------------|----------------------|-------------------------------------------------------------------------------------|
-| write.operation | upsert (Optional) | The
write operation, that this write should do<br /><br /> `Config Param:
OPERATION`|
-| precombine.field | ts (Optional) |
Field used in preCombining before actual write. When two records have the same
key value, we will pick the one with the largest value for the precombine
field, determined by Object.compareTo(..)<br /><br /> `Config Param:
PRECOMBINE_FIELD`|
-| write.precombine | false (Optional) | Flag
to indicate whether to drop duplicates before insert/upsert. By default these
cases will accept duplicates, to gain extra performance: 1) insert operation;
2) upsert for MOR table, the MOR table deduplicate on reading<br /><br />
`Config Param: PRE_COMBINE`|
-| write.bulk_insert.sort_input | true (Optional) |
Whether to sort the inputs by specific fields for bulk insert tasks, default
true<br /><br /> `Config Param: WRITE_BULK_INSERT_SORT_INPUT`
|
-| write.bulk_insert.sort_input.by_record_key | false (Optional) |
Whether to sort the inputs by record keys for bulk insert tasks, default
false<br /><br /> `Config Param: WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY`
|
+| Config Name | Default |
Description
|
+|--------------------------------------------|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| write.operation | upsert (Optional) | The
write operation, that this write should do<br /><br /> `Config Param:
OPERATION`
|
+| ordering.fields | (no default) (Optional) |
Fields used for ordering records before actual write. When two records have the
same key value, we will pick the one with the largest value for the ordering
field, determined by Object.compareTo(..). Note: The old config
`precombine.field` is deprecated.<br /><br /> `Config Param: ORDERING_FIELDS` |
+| write.precombine | false (Optional) | Flag
to indicate whether to drop duplicates before insert/upsert. By default these
cases will accept duplicates, to gain extra performance: 1) insert operation;
2) upsert for MOR table, the MOR table deduplicate on reading<br /><br />
`Config Param: PRE_COMBINE` |
+| write.bulk_insert.sort_input | true (Optional) |
Whether to sort the inputs by specific fields for bulk insert tasks, default
true<br /><br /> `Config Param: WRITE_BULK_INSERT_SORT_INPUT`
|
+| write.bulk_insert.sort_input.by_record_key | false (Optional) |
Whether to sort the inputs by record keys for bulk insert tasks, default
false<br /><br /> `Config Param: WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY`
|
## Write path
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index bc77f3483e16..e3ab60562f6a 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -25,8 +25,9 @@ Default value: `"uuid"`<br/>
**PARTITIONPATH_FIELD**: Columns to be used for partitioning the table. To
prevent partitioning, provide empty string as value eg: `""`. Specify
partitioning/no partitioning using `KEYGENERATOR_CLASS_OPT_KEY`. If partition
path needs to be url encoded, you can set `URL_ENCODE_PARTITIONING_OPT_KEY`. If
synchronizing to hive, also specify using
`HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.`<br/>
Default value: `"partitionpath"`<br/>
-**PRECOMBINE_FIELD**: When two records within the same batch have the same key
value, the record with the largest value from the field specified will be
choosen. If you are using default payload of OverwriteWithLatestAvroPayload for
HoodieRecordPayload (`WRITE_PAYLOAD_CLASS`), an incoming record will always
takes precendence compared to the one in storage ignoring this
`PRECOMBINE_FIELD_OPT_KEY`. <br/>
-Default value: `"ts"`<br/>
+**ORDERING_FIELDS**: When two records within the same batch have the same key
value, the record with the largest value from the ordering field specified will
be chosen. If you are using default payload of OverwriteWithLatestAvroPayload
for HoodieRecordPayload (`WRITE_PAYLOAD_CLASS`), an incoming record will always
takes precedence compared to the one in storage ignoring this ordering field
configuration. <br/>
+No default value<br/>
+Note: The config key `hoodie.datasource.write.precombine.field` is deprecated,
use `hoodie.table.ordering.fields` instead.
**OPERATION**: The [write operations](write_operations) to use.<br/>
Available values:<br/>
@@ -40,7 +41,7 @@ Available values:<br/>
Example:
-Upsert a DataFrame, specifying the necessary field names for `recordKey =>
_row_key`, `partitionPath => partition`, and `precombineKey => timestamp`
+Upsert a DataFrame, specifying the necessary field names for `recordKey =>
_row_key`, `partitionPath => partition`, and `orderingField => timestamp`
```java
inputDF.write()
@@ -48,7 +49,7 @@ inputDF.write()
.options(clientOpts) //Where clientOpts is of type Map[String, String].
clientOpts can include any other options necessary.
.option("hoodie.datasource.write.recordkey.field", "_row_key")
.option("hoodie.datasource.write.partitionpath.field", "partition")
- .option("hoodie.datasource.write.precombine.field"(), "timestamp")
+ .option("hoodie.table.ordering.fields", "timestamp")
.option("hoodie.table.name", tableName)
.mode(SaveMode.Append)
.save(basePath);
@@ -71,7 +72,7 @@ val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -104,7 +105,7 @@ hudi_options = {
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.table.ordering.fields': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
@@ -197,7 +198,7 @@ val df =
spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","insert_overwrite_table").
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -255,7 +256,7 @@ val df = spark.
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","insert_overwrite").
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -312,7 +313,7 @@ val softDeleteDf = nullifyColumns.
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation", "upsert").
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -346,7 +347,7 @@ val df =
spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","delete").
-option("hoodie.datasource.write.precombine.field", "ts").
+option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
@@ -361,7 +362,7 @@ This example will remove all the records from the table
that exist in the DataSe
deleteDF // dataframe containing just records to be deleted
.write().format("org.apache.hudi")
.option(...) // Add HUDI options like record-key, partition-path and others
as needed for your setup
- // specify record_key, partition_key, precombine_fieldkey & usual params
+ // specify record_key, partition_key, ordering_fields & usual params
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
"org.apache.hudi.EmptyHoodieRecordPayload")
```
@@ -426,7 +427,7 @@ Read more in-depth details about concurrency control in the
[concurrency control
```java
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
- .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.table.ordering.fields", "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
diff --git a/website/docs/writing_tables_streaming_writes.md
b/website/docs/writing_tables_streaming_writes.md
index 86a790705e1c..dda2801d1de4 100644
--- a/website/docs/writing_tables_streaming_writes.md
+++ b/website/docs/writing_tables_streaming_writes.md
@@ -39,7 +39,7 @@ val df = spark.readStream.
// write stream to new hudi table
df.writeStream.format("hudi").
options(getQuickstartWriteConfigs).
- option("hoodie.datasource.write.precombine.field", "ts").
+ option("hoodie.table.ordering.fields", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", streamingTableName).
@@ -67,7 +67,7 @@ hudi_streaming_options = {
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': streamingTableName,
'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.table.ordering.fields': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}