infoverload commented on a change in pull request #16871:
URL: https://github.com/apache/flink/pull/16871#discussion_r691987961



##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -506,22 +573,17 @@ behavior is slightly different between Table API and 
DataStream API.
 
 The DataStream API's `StreamExecutionEnvironment` acts as a _builder pattern_ 
to construct

Review comment:
       ```suggestion
   The DataStream API's `StreamExecutionEnvironment` uses a _builder pattern_ 
to construct
   ```

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -2341,6 +2694,16 @@ correctly via the DataStream API's reflective type 
extraction facilities. If the
 `TypeInformation` is a `CompositeType`, it will be flattened in the first 
level when deriving a table's
 schema.
 
+{{< hint warning >}}
+Many times the DataStream API is unable to extract a more specific 
`TypeInformation` based on reflection.

Review comment:
       ```suggestion
   Many times the DataStream API is unable to extract a more specific 
`TypeInformation` based on 
[reflection](https://www.oracle.com/technical-resources/articles/java/javareflection.html).
   ```

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
 
 {{< top >}}
 
+Batch Runtime Mode
+------------------
+
+The *batch runtime mode* is a specialized execution mode for *bounded* Flink 
programs.
+
+Generally speaking, *boundedness* is a property of a data source that tells us 
whether all the records
+coming from that source are known before execution or whether new data will 
show up, potentially
+indefinitely. A job, in turn, is bounded if all its sources are bounded, and 
unbounded otherwise.
+
+*Streaming runtime mode*, on the other hand, can be used for both bounded and 
unbounded jobs.
+
+For more information on the different execution modes, see also the 
corresponding [DataStream API section]({{< ref 
"docs/dev/datastream/execution_mode" >}}).
+
+The Table API & SQL planner provides a set of specialized optimizer rules and 
runtime operators for either
+of the two modes.
+
+Currently, the runtime mode is not derived automatically from sources, thus, 
it must be set explicitly
+or will be adopted from `StreamExecutionEnvironment` when instantiating a 
`StreamTableEnvironment`:
+
+{{< tabs "f786b6ff-facc-4102-8833-3669f2fdef38" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+
+// adopt mode from StreamExecutionEnvironment
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+// it will be propagated to StreamExecutionEnvironment during planning
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+
+// adopt mode from StreamExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setRuntimeMode(RuntimeExecutionMode.BATCH)
+val tableEnv = StreamTableEnvironment.create(env)
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+One must meet the following prerequisites before setting the runtime mode to 
`BATCH`:
+
+- All sources must declare themselves as bounded.
+
+- Currently, table sources must emit insert-only changes.
+
+- Operators need a sufficient amount of [off-heap memory]({{< ref 
"docs/deployment/memory/mem_setup_tm" >}}#managed-memory)
+for sorting and other intermediate results.
+
+- All table operations must be available in batch mode. Currently, some of 
them are only available in
+streaming mode. Please check the corresponding Table API & SQL pages.
+
+A batch execution has the following implications (among others):
+
+- Progressive watermarks are neither generated nor used in operators. However, 
sources emit a maximum
+watermark before shutting down.
+
+- Exchanges between tasks might be blocking according to the 
[`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode).
+This also means potentially less resource requirements compared to executing 
the same pipeline in streaming mode.
+
+- Checkpointing is disabled. Artificial state backends are inserted.
+
+- Table operations don't produce incremental updates but only a complete final 
result which converts
+to an insert-only changelog stream.
+
+{{< hint info >}}
+Since batch processing can be considered as *a special case of stream 
processing*, we recommend implementing
+a streaming pipeline first as it is the most general implementation for both 
bounded and unbounded data.
+{{< /hint >}}
+
+The following example shows how to play around with batch mode using the 
[DataGen table source]({{< ref "docs/connectors/table/datagen" >}}). Many 
sources offer options that implicitly make the connector bounded, for example,
+by defining an terminating offset or timestamp. In our example, we limit the 
number of rows with the

Review comment:
       ```suggestion
   by defining a terminating offset or timestamp. In our example, we limit the 
number of rows with the
   ```

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -337,6 +324,86 @@ In particular, the section discusses how to influence the 
schema derivation with
 types. It covers working with event-time and watermarks. It discusses how to 
declare a primary key and
 changelog mode for the input and output streams.
 
+The example above shows how the final result is computed incrementally by 
continuously emitting row-wise
+updates for each incoming record. However, in cases where the input streams 
are finite (i.e. *bounded*),
+a result can be computed more efficiently by leveraging batch processing 
principles. Both DataStream

Review comment:
       Perhaps a brief explanation of what this means or a link to more info? 

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
 
 {{< top >}}
 
+Batch Runtime Mode

Review comment:
       This whole section seems kind of general.  Do you think it should be in 
a separate section and then linked to? 

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -574,6 +636,297 @@ env.execute()
 
 {{< top >}}
 
+Batch Runtime Mode
+------------------
+
+The *batch runtime mode* is a specialized execution mode for *bounded* Flink 
programs.
+
+Generally speaking, *boundedness* is a property of a data source that tells us 
whether all the records
+coming from that source are known before execution or whether new data will 
show up, potentially
+indefinitely. A job, in turn, is bounded if all its sources are bounded, and 
unbounded otherwise.
+
+*Streaming runtime mode*, on the other hand, can be used for both bounded and 
unbounded jobs.
+
+For more information on the different execution modes, see also the 
corresponding [DataStream API section]({{< ref 
"docs/dev/datastream/execution_mode" >}}).
+
+The Table API & SQL planner provides a set of specialized optimizer rules and 
runtime operators for either
+of the two modes.
+
+Currently, the runtime mode is not derived automatically from sources, thus, 
it must be set explicitly
+or will be adopted from `StreamExecutionEnvironment` when instantiating a 
`StreamTableEnvironment`:
+
+{{< tabs "f786b6ff-facc-4102-8833-3669f2fdef38" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+
+// adopt mode from StreamExecutionEnvironment
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+// it will be propagated to StreamExecutionEnvironment during planning
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+
+// adopt mode from StreamExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setRuntimeMode(RuntimeExecutionMode.BATCH)
+val tableEnv = StreamTableEnvironment.create(env)
+
+// or
+
+// set mode explicitly for StreamTableEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+One must meet the following prerequisites before setting the runtime mode to 
`BATCH`:
+
+- All sources must declare themselves as bounded.
+
+- Currently, table sources must emit insert-only changes.
+
+- Operators need a sufficient amount of [off-heap memory]({{< ref 
"docs/deployment/memory/mem_setup_tm" >}}#managed-memory)
+for sorting and other intermediate results.
+
+- All table operations must be available in batch mode. Currently, some of 
them are only available in
+streaming mode. Please check the corresponding Table API & SQL pages.
+
+A batch execution has the following implications (among others):
+
+- Progressive watermarks are neither generated nor used in operators. However, 
sources emit a maximum
+watermark before shutting down.
+
+- Exchanges between tasks might be blocking according to the 
[`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode).
+This also means potentially less resource requirements compared to executing 
the same pipeline in streaming mode.
+
+- Checkpointing is disabled. Artificial state backends are inserted.
+
+- Table operations don't produce incremental updates but only a complete final 
result which converts
+to an insert-only changelog stream.
+
+{{< hint info >}}
+Since batch processing can be considered as *a special case of stream 
processing*, we recommend implementing
+a streaming pipeline first as it is the most general implementation for both 
bounded and unbounded data.

Review comment:
       Should a blurb be added about how Flink runs more optimally this way? 

##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -337,6 +324,86 @@ In particular, the section discusses how to influence the 
schema derivation with
 types. It covers working with event-time and watermarks. It discusses how to 
declare a primary key and
 changelog mode for the input and output streams.
 
+The example above shows how the final result is computed incrementally by 
continuously emitting row-wise
+updates for each incoming record. However, in cases where the input streams 
are finite (i.e. *bounded*),
+a result can be computed more efficiently by leveraging batch processing 
principles. Both DataStream
+API and Table API offer a specialized *batch runtime mode*.
+
+The following example illustrates that the unified pipeline is able to process 
both batch and streaming data
+by just switching a flag.
+
+{{< tabs "61a8a0b8-c38b-48f3-b52e-563546139380" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+// setup DataStream API
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// set the batch runtime mode
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+// uncomment this for streaming mode
+// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+// setup Table API
+// the table environment adopts the runtime mode during initialization
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// define the same pipeline as above
+
+// prints in BATCH mode:
+// +I[Bob, 10]
+// +I[Alice, 112]
+
+// prints in STREAMING mode:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]

Review comment:
       Wondering whether this is best left as code comments or whether it would 
be better to explain it in words




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to