infoverload commented on a change in pull request #16871:
URL: https://github.com/apache/flink/pull/16871#discussion_r691987961
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -506,22 +573,17 @@ behavior is slightly different between Table API and
DataStream API.
The DataStream API's `StreamExecutionEnvironment` acts as a _builder pattern_
to construct
Review comment:
```suggestion
The DataStream API's `StreamExecutionEnvironment` uses a _builder pattern_
to construct
```
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -2341,6 +2694,16 @@ correctly via the DataStream API's reflective type
extraction facilities. If the
`TypeInformation` is a `CompositeType`, it will be flattened in the first
level when deriving a table's
schema.
+{{< hint warning >}}
+Many times the DataStream API is unable to extract a more specific
`TypeInformation` based on reflection.
Review comment:
```suggestion
Many times the DataStream API is unable to extract a more specific
`TypeInformation` based on
[reflection](https://www.oracle.com/technical-resources/articles/java/javareflection.html).
```
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
{{< top >}}
+Batch Runtime Mode
+------------------
+
+The *batch runtime mode* is a specialized execution mode for *bounded* Flink
programs.
+
+Generally speaking, *boundedness* is a property of a data source that tells us
whether all the records
+coming from that source are known before execution or whether new data will
show up, potentially
+indefinitely. A job, in turn, is bounded if all its sources are bounded, and
unbounded otherwise.
+
+*Streaming runtime mode*, on the other hand, can be used for both bounded and
unbounded jobs.
+
+For more information on the different execution modes, see also the
corresponding [DataStream API section]({{< ref
"docs/dev/datastream/execution_mode" >}}).
+
+The Table API & SQL planner provides a set of specialized optimizer rules and
runtime operators for either
+of the two modes.
+
+Currently, the runtime mode is not derived automatically from sources, thus,
it must be set explicitly
+or will be adopted from `StreamExecutionEnvironment` when instantiating a
`StreamTableEnvironment`:
+
+{{< tabs "f786b6ff-facc-4102-8833-3669f2fdef38" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+
+// adopt mode from StreamExecutionEnvironment
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+// it will be propagated to StreamExecutionEnvironment during planning
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+
+// adopt mode from StreamExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setRuntimeMode(RuntimeExecutionMode.BATCH)
+val tableEnv = StreamTableEnvironment.create(env)
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+One must meet the following prerequisites before setting the runtime mode to
`BATCH`:
+
+- All sources must declare themselves as bounded.
+
+- Currently, table sources must emit insert-only changes.
+
+- Operators need a sufficient amount of [off-heap memory]({{< ref
"docs/deployment/memory/mem_setup_tm" >}}#managed-memory)
+for sorting and other intermediate results.
+
+- All table operations must be available in batch mode. Currently, some of
them are only available in
+streaming mode. Please check the corresponding Table API & SQL pages.
+
+A batch execution has the following implications (among others):
+
+- Progressive watermarks are neither generated nor used in operators. However,
sources emit a maximum
+watermark before shutting down.
+
+- Exchanges between tasks might be blocking according to the
[`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config"
>}}#execution-batch-shuffle-mode).
+This also means potentially less resource requirements compared to executing
the same pipeline in streaming mode.
+
+- Checkpointing is disabled. Artificial state backends are inserted.
+
+- Table operations don't produce incremental updates but only a complete final
result which converts
+to an insert-only changelog stream.
+
+{{< hint info >}}
+Since batch processing can be considered as *a special case of stream
processing*, we recommend implementing
+a streaming pipeline first as it is the most general implementation for both
bounded and unbounded data.
+{{< /hint >}}
+
+The following example shows how to play around with batch mode using the
[DataGen table source]({{< ref "docs/connectors/table/datagen" >}}). Many
sources offer options that implicitly make the connector bounded, for example,
+by defining an terminating offset or timestamp. In our example, we limit the
number of rows with the
Review comment:
```suggestion
by defining a terminating offset or timestamp. In our example, we limit the
number of rows with the
```
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -337,6 +324,86 @@ In particular, the section discusses how to influence the
schema derivation with
types. It covers working with event-time and watermarks. It discusses how to
declare a primary key and
changelog mode for the input and output streams.
+The example above shows how the final result is computed incrementally by
continuously emitting row-wise
+updates for each incoming record. However, in cases where the input streams
are finite (i.e. *bounded*),
+a result can be computed more efficiently by leveraging batch processing
principles. Both DataStream
Review comment:
Perhaps a brief explanation of what this means or a link to more info?
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
{{< top >}}
+Batch Runtime Mode
Review comment:
This whole section seems kind of general. Do you think it should be in
a separate section and then linked to?
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
{{< top >}}
+Batch Runtime Mode
+------------------
+
+The *batch runtime mode* is a specialized execution mode for *bounded* Flink
programs.
+
+Generally speaking, *boundedness* is a property of a data source that tells us
whether all the records
+coming from that source are known before execution or whether new data will
show up, potentially
+indefinitely. A job, in turn, is bounded if all its sources are bounded, and
unbounded otherwise.
+
+*Streaming runtime mode*, on the other hand, can be used for both bounded and
unbounded jobs.
+
+For more information on the different execution modes, see also the
corresponding [DataStream API section]({{< ref
"docs/dev/datastream/execution_mode" >}}).
+
+The Table API & SQL planner provides a set of specialized optimizer rules and
runtime operators for either
+of the two modes.
+
+Currently, the runtime mode is not derived automatically from sources, thus,
it must be set explicitly
+or will be adopted from `StreamExecutionEnvironment` when instantiating a
`StreamTableEnvironment`:
+
+{{< tabs "f786b6ff-facc-4102-8833-3669f2fdef38" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+
+// adopt mode from StreamExecutionEnvironment
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+// it will be propagated to StreamExecutionEnvironment during planning
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+
+// adopt mode from StreamExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setRuntimeMode(RuntimeExecutionMode.BATCH)
+val tableEnv = StreamTableEnvironment.create(env)
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+One must meet the following prerequisites before setting the runtime mode to
`BATCH`:
+
+- All sources must declare themselves as bounded.
+
+- Currently, table sources must emit insert-only changes.
+
+- Operators need a sufficient amount of [off-heap memory]({{< ref
"docs/deployment/memory/mem_setup_tm" >}}#managed-memory)
+for sorting and other intermediate results.
+
+- All table operations must be available in batch mode. Currently, some of
them are only available in
+streaming mode. Please check the corresponding Table API & SQL pages.
+
+A batch execution has the following implications (among others):
+
+- Progressive watermarks are neither generated nor used in operators. However,
sources emit a maximum
+watermark before shutting down.
+
+- Exchanges between tasks might be blocking according to the
[`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config"
>}}#execution-batch-shuffle-mode).
+This also means potentially less resource requirements compared to executing
the same pipeline in streaming mode.
+
+- Checkpointing is disabled. Artificial state backends are inserted.
+
+- Table operations don't produce incremental updates but only a complete final
result which converts
+to an insert-only changelog stream.
+
+{{< hint info >}}
+Since batch processing can be considered as *a special case of stream
processing*, we recommend implementing
+a streaming pipeline first as it is the most general implementation for both
bounded and unbounded data.
Review comment:
Should a blurb be added about how Flink runs more optimally this way?
##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -337,6 +324,86 @@ In particular, the section discusses how to influence the
schema derivation with
types. It covers working with event-time and watermarks. It discusses how to
declare a primary key and
changelog mode for the input and output streams.
+The example above shows how the final result is computed incrementally by
continuously emitting row-wise
+updates for each incoming record. However, in cases where the input streams
are finite (i.e. *bounded*),
+a result can be computed more efficiently by leveraging batch processing
principles. Both DataStream
+API and Table API offer a specialized *batch runtime mode*.
+
+The following example illustrates that the unified pipeline is able to process
both batch and streaming data
+by just switching a flag.
+
+{{< tabs "61a8a0b8-c38b-48f3-b52e-563546139380" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+// setup DataStream API
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// set the batch runtime mode
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+// uncomment this for streaming mode
+// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+// setup Table API
+// the table environment adopts the runtime mode during initialization
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// define the same pipeline as above
+
+// prints in BATCH mode:
+// +I[Bob, 10]
+// +I[Alice, 112]
+
+// prints in STREAMING mode:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]
Review comment:
Wondering whether this is best left as code comments or whether it would
be better to explain it in words
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]