This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/amoro-site.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 2be0296 deploy: 95aa6bf87d3b104bda6a0389947589aa9a47c68d
2be0296 is described below
commit 2be029669fa8a24db2c04398c73701c8ad4416ce
Author: zhoujinsong <[email protected]>
AuthorDate: Fri Aug 2 06:07:37 2024 +0000
deploy: 95aa6bf87d3b104bda6a0389947589aa9a47c68d
---
output/docs/0.6.1/engines/index.xml | 2 +-
output/docs/0.6.1/flink-datastream/index.html | 348 ++++++++++----------------
output/docs/0.6.1/index.html | 2 +-
output/docs/0.6.1/index.xml | 2 +-
output/docs/0.6.1/search.json | 2 +-
5 files changed, 136 insertions(+), 220 deletions(-)
diff --git a/output/docs/0.6.1/engines/index.xml
b/output/docs/0.6.1/engines/index.xml
index 0d4f610..ffa63ce 100644
--- a/output/docs/0.6.1/engines/index.xml
+++ b/output/docs/0.6.1/engines/index.xml
@@ -12,7 +12,7 @@
<link>https://amoro.apache.org/docs/0.6.1/flink-datastream/</link>
<pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate>
<guid>https://amoro.apache.org/docs/0.6.1/flink-datastream/</guid>
- <description>Flink DataStream Add maven dependency To add a dependency
on Mixed-format flink connector in Maven, add the following to your
pom.xml:
&lt;dependencies&gt; ... &lt;dependency&gt;
&lt;groupId&gt;com.netease.arctic&lt;/groupId&gt; &lt;!--
For example: amoro-flink-1.15 --&gt;
&lt;artifactId&gt;amoro-flink-${flink.minor-version}&lt;/artifactId&gt;
&lt;!-- For example: 0.6.1 --&gt; &lt;version&gt;${ [...]
+ <description>Flink DataStream Reading with DataStream Amoro supports
reading data in Batch or Streaming mode through Java API.
Batch mode Using
Batch mode to read the full and incremental data in the
FileStore.
Non-primary key tables support reading full data in batch mode,
snapshot data with a specified snapshot-id or timestamp, and incremental data
with a specified snapshot interval. The primary key table temporarily only
supports reading the current full amount and later [...]
</item>
<item>
<title>Flink DDL</title>
diff --git a/output/docs/0.6.1/flink-datastream/index.html
b/output/docs/0.6.1/flink-datastream/index.html
index 65eacdc..f1ee51e 100644
--- a/output/docs/0.6.1/flink-datastream/index.html
+++ b/output/docs/0.6.1/flink-datastream/index.html
@@ -539,20 +539,7 @@
<div id="content" class="markdown-body">
<div class="margin-for-toc"><h1 id="flink-datastream">Flink
DataStream</h1>
-<h2 id="add-maven-dependency">Add maven dependency</h2>
-<p>To add a dependency on Mixed-format flink connector in Maven, add the
following to your pom.xml:</p>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-xml" data-lang="xml"><span style="display:flex;"><span><span
style="color:#f92672"><dependencies></span>
-</span></span><span style="display:flex;"><span> ...
-</span></span><span style="display:flex;"><span> <span
style="color:#f92672"><dependency></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#f92672"><groupId></span>com.netease.arctic<span
style="color:#f92672"></groupId></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e"><!-- For example: amoro-flink-1.15 --></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#f92672"><artifactId></span>amoro-flink-${flink.minor-version}<span
style="color:#f92672"></artifactId></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e"><!-- For example: 0.6.1 --></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#f92672"><version></span>${amoro-mixed-format-flink.version}<span
style="color:#f92672"></version></span>
-</span></span><span style="display:flex;"><span> <span
style="color:#f92672"></dependency></span>
-</span></span><span style="display:flex;"><span> ...
-</span></span><span style="display:flex;"><span><span
style="color:#f92672"></dependencies></span>
-</span></span></code></pre></div><h2 id="reading-with-datastream">Reading with
DataStream</h2>
+<h2 id="reading-with-datastream">Reading with DataStream</h2>
<p>Amoro supports reading data in Batch or Streaming mode through Java API.</p>
<h3 id="batch-mode">Batch mode</h3>
<p>Using Batch mode to read the full and incremental data in the FileStore.</p>
@@ -560,46 +547,32 @@
<li>Non-primary key tables support reading full data in batch mode, snapshot
data with a specified snapshot-id or timestamp, and incremental data with a
specified snapshot interval.</li>
<li>The primary key table temporarily only supports reading the current full
amount and later CDC data.</li>
</ul>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.table.FlinkSource;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> java.util.HashMap;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> java.util.Map;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span>
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span> <span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span>
<span style="color:#66d9ef">void</span> <span
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span>
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>
StreamExecutionEnvironment env <span style="color:#f92672">=</span>
StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span> InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableIdentifier
tableId <span style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
-</span></span><span style="display:flex;"><span> ArcticTableLoader
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> Map<span
style="color:#f92672"><</span>String, String<span
style="color:#f92672">></span> properties <span
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span>
HashMap<span style="color:#f92672"><></span>();
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Default is true</span>
-</span></span><span style="display:flex;"><span> properties.<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"streaming"</span>, <span
style="color:#e6db74">"false"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
batch <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
FlinkSource.<span style="color:#a6e22e">forRowData</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">env</span>(env)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">properties</span>(properties)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// print all data read</span>
-</span></span><span style="display:flex;"><span> batch.<span
style="color:#a6e22e">print</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span> env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Mixed-format table batch read"</span>);
-</span></span><span style="display:flex;"><span> }
-</span></span><span style="display:flex;"><span>}
+<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span
style="display:flex;"><span>StreamExecutionEnvironment env <span
style="color:#f92672">=</span> StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span> InternalCatalogBuilder
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> AmoroTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>Map<span
style="color:#f92672"><</span>String, String<span
style="color:#f92672">></span> properties <span
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span>
HashMap<span style="color:#f92672"><></span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Default is true.</span>
+</span></span><span style="display:flex;"><span>properties.<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"streaming"</span>, <span
style="color:#e6db74">"false"</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
batch <span style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span> FlinkSource.<span
style="color:#a6e22e">forRowData</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">env</span>(env)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// The primary key table only supports reading the
current full amount and later CDC data temporarily, without the properties
parameter .</span>
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">properties</span>(properties)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
print All data read</span>
+</span></span><span style="display:flex;"><span>batch.<span
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Batch Read"</span>);
</span></span></code></pre></div><p>The map properties contain below keys,
<strong>currently only valid for non-primary key tables</strong>:</p>
<table>
<thead>
@@ -652,188 +625,132 @@
<h3 id="streaming-mode">Streaming mode</h3>
<p>Amoro supports reading incremental data in FileStore or LogStore through
Java API in Streaming mode</p>
<h3 id="streaming-mode-logstore">Streaming mode (LogStore)</h3>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.read.source.log.kafka.LogKafkaSource;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.util.ArcticUtils;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.ArcticTable;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.api.common.eventtime.WatermarkStrategy;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.iceberg.Schema;
+<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span
style="display:flex;"><span>StreamExecutionEnvironment env <span
style="color:#f92672">=</span> StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span> InternalCatalogBuilder
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> AmoroTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span>
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span> <span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span>
<span style="color:#66d9ef">void</span> <span
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span>
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>
StreamExecutionEnvironment env <span style="color:#f92672">=</span>
StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span> InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
+</span></span><span style="display:flex;"><span>AmoroTable table <span
style="color:#f92672">=</span> AmoroUtils.<span
style="color:#a6e22e">load</span>(tableLoader);
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Read table All fields. If you only read some fields, you can construct the
schema yourself, for example: </span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Schema userSchema = new Schema(new ArrayList<Types.NestedField>()
{{</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
add(Types.NestedField.optional(0, "f_boolean",
Types.BooleanType.get()));</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
add(Types.NestedField.optional(1, "f_int",
Types.IntegerType.get()));</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
}});</span>
+</span></span><span style="display:flex;"><span>Schema schema <span
style="color:#f92672">=</span> table.<span
style="color:#a6e22e">schema</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableIdentifier
tableId <span style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
-</span></span><span style="display:flex;"><span> ArcticTableLoader
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
-----------Hidden Kafka--------------</span>
+</span></span><span style="display:flex;"><span>LogKafkaSource source <span
style="color:#f92672">=</span> LogKafkaSource.<span
style="color:#a6e22e">builder</span>(schema, table.<span
style="color:#a6e22e">properties</span>()).<span
style="color:#a6e22e">build</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> ArcticTable table
<span style="color:#f92672">=</span> ArcticUtils.<span
style="color:#a6e22e">loadArcticTable</span>(tableLoader);
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Read table All fields. If you only read some fields,
you can construct the schema yourself, for example:</span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Schema userSchema = new Schema(new
ArrayList<Types.NestedField>() {{</span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// add(Types.NestedField.optional(0,
"f_boolean", Types.BooleanType.get()));</span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// add(Types.NestedField.optional(1, "f_int",
Types.IntegerType.get()));</span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// }});</span>
-</span></span><span style="display:flex;"><span> Schema schema <span
style="color:#f92672">=</span> table.<span
style="color:#a6e22e">schema</span>();
+</span></span><span style="display:flex;"><span>or
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// -----------Hidden Kafka--------------</span>
-</span></span><span style="display:flex;"><span> LogKafkaSource source
<span style="color:#f92672">=</span> LogKafkaSource.<span
style="color:#a6e22e">builder</span>(schema, table.<span
style="color:#a6e22e">properties</span>()).<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
-----------Hidden Pulsar--------------</span>
+</span></span><span style="display:flex;"><span>LogPulsarSource source <span
style="color:#f92672">=</span> LogPulsarSource.<span
style="color:#a6e22e">builder</span>(schema, table.<span
style="color:#a6e22e">properties</span>()).<span
style="color:#a6e22e">build</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
stream <span style="color:#f92672">=</span> env.<span
style="color:#a6e22e">fromSource</span>(source, WatermarkStrategy.<span
style="color:#a6e22e">noWatermarks</span>(), <span
style="color:#e6db74">"Log Source"</span>);
+</span></span><span style="display:flex;"><span>DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
stream <span style="color:#f92672">=</span> env.<span
style="color:#a6e22e">fromSource</span>(source, WatermarkStrategy.<span
style="color:#a6e22e">noWatermarks</span>(), <span
style="color:#e6db74">"Log Source"</span>);
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Print all the read data</span>
-</span></span><span style="display:flex;"><span> stream.<span
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Print all the read data</span>
+</span></span><span style="display:flex;"><span>stream.<span
style="color:#a6e22e">print</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span> env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Mixed-format table streaming read"</span>);
-</span></span><span style="display:flex;"><span> }
-</span></span><span style="display:flex;"><span>}
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Stream Read"</span>);
</span></span></code></pre></div><h3 id="streaming-mode-filestore">Streaming
mode (FileStore)</h3>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.table.FlinkSource;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> java.util.HashMap;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> java.util.Map;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span>
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span> <span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span>
<span style="color:#66d9ef">void</span> <span
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span>
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>
StreamExecutionEnvironment env <span style="color:#f92672">=</span>
StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span> InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableIdentifier
tableId <span style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
-</span></span><span style="display:flex;"><span> ArcticTableLoader
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> Map<span
style="color:#f92672"><</span>String, String<span
style="color:#f92672">></span> properties <span
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span>
HashMap<span style="color:#f92672"><></span>();
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Default value is true</span>
-</span></span><span style="display:flex;"><span> properties.<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"streaming"</span>, <span
style="color:#e6db74">"true"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
stream <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
FlinkSource.<span style="color:#a6e22e">forRowData</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">env</span>(env)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">properties</span>(properties)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Print all read data</span>
-</span></span><span style="display:flex;"><span> stream.<span
style="color:#a6e22e">print</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span> env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Mixed-format table streaming Read"</span>);
-</span></span><span style="display:flex;"><span> }
-</span></span><span style="display:flex;"><span>}
+<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span
style="display:flex;"><span>StreamExecutionEnvironment env <span
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>Map<span
style="color:#f92672"><</span>String, String<span
style="color:#f92672">></span> properties <span
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span>
HashMap<span style="color:#f92672"><></span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
default is true </span>
+</span></span><span style="display:flex;"><span>properties.<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"streaming"</span>, <span
style="color:#e6db74">"true"</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
stream <span style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span> FlinkSource.<span
style="color:#a6e22e">forRowData</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">env</span>(env)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// The primary key table only supports reading the
current full amount and later CDC data for the time being, without the
properties parameter</span>
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">properties</span>(properties)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Print All read data </span>
+</span></span><span style="display:flex;"><span>stream.<span
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Stream Read"</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>StreamExecutionEnvironment env
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>Map properties <span
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span>
HashMap<span style="color:#f92672"><></span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
default is true properties.put("streaming", "true"); </span>
+</span></span><span style="display:flex;"><span>DataStream stream <span
style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span> FlinkSource.<span
style="color:#a6e22e">forRowData</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">env</span>(env)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// The primary key table only supports reading the
current full amount and later CDC data for the time being, without the
properties parameter</span>
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">properties</span>(properties)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
print All read data </span>
+</span></span><span style="display:flex;"><span>stream.<span
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Submit and execute the task </span>
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Stream Read"</span>);
</span></span></code></pre></div><p>DataStream API supports reading primary
key tables and non-primary key tables. The configuration items supported by
properties can refer to Querying With SQL <a href="../flink-dml/">chapter Hint
Option</a></p>
<h2 id="writing-with-datastream">Writing with DataStream</h2>
<p>Amoro table supports writing data to LogStore or FileStore through Java
API</p>
<h3 id="overwrite-data">Overwrite data</h3>
<p>Amoro table currently Only supports the existing data in the dynamic
Overwrite table of the non-primary key table</p>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.write.FlinkSink;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.api.DataTypes;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.api.TableSchema;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span>
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span> <span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span>
<span style="color:#66d9ef">void</span> <span
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span>
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Build your data stream</span>
-</span></span><span style="display:flex;"><span> DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
input <span style="color:#f92672">=</span> <span
style="color:#66d9ef">null</span>;
-</span></span><span style="display:flex;"><span>
StreamExecutionEnvironment env <span style="color:#f92672">=</span>
StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span> InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableIdentifier
tableId <span style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
-</span></span><span style="display:flex;"><span> ArcticTableLoader
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableSchema
flinkSchema <span style="color:#f92672">=</span> TableSchema.<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"id"</span>, DataTypes.<span
style="color:#a6e22e">INT</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"name"</span>, DataTypes.<span
style="color:#a6e22e">STRING</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"op_time"</span>, DataTypes.<span
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> FlinkSink
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">forRowData</span>(input)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">overwrite</span>(<span style="color:#66d9ef">true</span>)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">flinkSchema</span>(flinkSchema)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span> env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Mixed-format table overwrite"</span>);
-</span></span><span style="display:flex;"><span> }
-</span></span><span style="display:flex;"><span>}
+<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span
style="display:flex;"><span>DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
input <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableSchema FLINK_SCHEMA <span
style="color:#f92672">=</span> TableSchema.<span
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"id"</span>, DataTypes.<span
style="color:#a6e22e">INT</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"name"</span>, DataTypes.<span
style="color:#a6e22e">STRING</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"op_time"</span>, DataTypes.<span
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>FlinkSink
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">forRowData</span>(input)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">overwrite</span>(<span style="color:#66d9ef">true</span>)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">flinkSchema</span>(FLINK_SCHEMA)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">//
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Overwrite"</span>);
+</span></span><span style="display:flex;"><span>DataStream input <span
style="color:#f92672">=</span> ...; InternalCatalogBuilder catalogBuilder <span
style="color:#f92672">=</span> ...; TableIdentifier tableId <span
style="color:#f92672">=</span> ...; AmoroTableLoader tableLoader <span
style="color:#f92672">=</span> ...; TableSchema FLINK_SCHEMA <span
style="color:#f92672">=</span> TableSchema.<span
style="color:#a6e22e">builder</span>() .<span
style="color:#a6e22e">field</span>(<span s [...]
</span></span></code></pre></div><h3 id="appending-data">Appending data</h3>
<p>For the Amoro table, it supports specifying to write data to FileStore or
LogStore through Java API.</p>
-<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.util.ArcticUtils;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.flink.write.FlinkSink;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.ArcticTable;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span>
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.api.DataTypes;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.api.TableSchema;
-</span></span><span style="display:flex;"><span><span
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span>
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span> <span
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span>
<span style="color:#66d9ef">void</span> <span
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span>
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span> <span
style="color:#75715e">// Build your data stream</span>
-</span></span><span style="display:flex;"><span> DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
input <span style="color:#f92672">=</span> <span
style="color:#66d9ef">null</span>;
-</span></span><span style="display:flex;"><span>
StreamExecutionEnvironment env <span style="color:#f92672">=</span>
StreamExecutionEnvironment.<span
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span> InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">metastoreUrl</span>(<span
style="color:#e6db74">"thrift://<url>:<port>/<catalog_name>"</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableIdentifier
tableId <span style="color:#f92672">=</span> TableIdentifier.<span
style="color:#a6e22e">of</span>(<span
style="color:#e6db74">"catalog_name"</span>, <span
style="color:#e6db74">"database_name"</span>, <span
style="color:#e6db74">"test_table"</span>);
-</span></span><span style="display:flex;"><span> ArcticTableLoader
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+<div class="highlight"><pre tabindex="0"
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
class="language-java" data-lang="java"><span
style="display:flex;"><span>DataStream<span
style="color:#f92672"><</span>RowData<span style="color:#f92672">></span>
input <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader
<span style="color:#f92672">=</span> ...;
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> TableSchema
flinkSchema <span style="color:#f92672">=</span> TableSchema.<span
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"id"</span>, DataTypes.<span
style="color:#a6e22e">INT</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"name"</span>, DataTypes.<span
style="color:#a6e22e">STRING</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"op_time"</span>, DataTypes.<span
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>TableSchema FLINK_SCHEMA <span
style="color:#f92672">=</span> TableSchema.<span
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"id"</span>, DataTypes.<span
style="color:#a6e22e">INT</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"name"</span>, DataTypes.<span
style="color:#a6e22e">STRING</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">field</span>(<span
style="color:#e6db74">"op_time"</span>, DataTypes.<span
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> ArcticTable table
<span style="color:#f92672">=</span> ArcticUtils.<span
style="color:#a6e22e">loadArcticTable</span>(tableLoader);
+</span></span><span style="display:flex;"><span>AmoroTable table <span
style="color:#f92672">=</span> AmoroUtils.<span
style="color:#a6e22e">loadAmoroTable</span>(tableLoader);
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> table.<span
style="color:#a6e22e">properties</span>().<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"arctic.emit.mode"</span>, <span
style="color:#e6db74">"log,file"</span>);
+</span></span><span style="display:flex;"><span>table.<span
style="color:#a6e22e">properties</span>().<span
style="color:#a6e22e">put</span>(<span
style="color:#e6db74">"arctic.emit.mode"</span>, <span
style="color:#e6db74">"log,file"</span>);
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> FlinkSink
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">forRowData</span>(input)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">table</span>(table)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">flinkSchema</span>(flinkSchema)
-</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>FlinkSink
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">forRowData</span>(input)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">table</span>(table)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">flinkSchema</span>(FLINK_SCHEMA)
+</span></span><span style="display:flex;"><span> .<span
style="color:#a6e22e">build</span>();
</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span> env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Mixed-format table append"</span>);
-</span></span><span style="display:flex;"><span> }
-</span></span><span style="display:flex;"><span>}
+</span></span><span style="display:flex;"><span>env.<span
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">"Test
Amoro Append"</span>);
</span></span></code></pre></div><p>The DataStream API supports writing to
primary key tables and non-primary key tables. The configuration items
supported by properties can refer to Writing With SQL <a
href="../flink-dml/">chapter Hint Options</a></p>
<blockquote>
<p><strong>TIPS</strong></p>
@@ -861,7 +778,6 @@
<div id="full">
<nav id="TableOfContents">
<ul>
- <li><a href="#add-maven-dependency">Add maven dependency</a></li>
<li><a href="#reading-with-datastream">Reading with DataStream</a>
<ul>
<li><a href="#batch-mode">Batch mode</a></li>
diff --git a/output/docs/0.6.1/index.html b/output/docs/0.6.1/index.html
index 7b7d037..286741c 100644
--- a/output/docs/0.6.1/index.html
+++ b/output/docs/0.6.1/index.html
@@ -32,7 +32,7 @@
<!DOCTYPE html>
<html>
<head>
- <meta name="generator" content="Hugo 0.127.0">
+ <meta name="generator" content="Hugo 0.130.0">
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
diff --git a/output/docs/0.6.1/index.xml b/output/docs/0.6.1/index.xml
index a68296f..eb815ce 100644
--- a/output/docs/0.6.1/index.xml
+++ b/output/docs/0.6.1/index.xml
@@ -61,7 +61,7 @@
<link>https://amoro.apache.org/docs/0.6.1/flink-datastream/</link>
<pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate>
<guid>https://amoro.apache.org/docs/0.6.1/flink-datastream/</guid>
- <description>Flink DataStream Add maven dependency To add a dependency
on Mixed-format flink connector in Maven, add the following to your
pom.xml:
&lt;dependencies&gt; ... &lt;dependency&gt;
&lt;groupId&gt;com.netease.arctic&lt;/groupId&gt; &lt;!--
For example: amoro-flink-1.15 --&gt;
&lt;artifactId&gt;amoro-flink-${flink.minor-version}&lt;/artifactId&gt;
&lt;!-- For example: 0.6.1 --&gt; &lt;version&gt;${ [...]
+ <description>Flink DataStream Reading with DataStream Amoro supports
reading data in Batch or Streaming mode through Java API.
Batch mode Using
Batch mode to read the full and incremental data in the
FileStore.
Non-primary key tables support reading full data in batch mode,
snapshot data with a specified snapshot-id or timestamp, and incremental data
with a specified snapshot interval. The primary key table temporarily only
supports reading the current full amount and later [...]
</item>
<item>
<title>Flink DDL</title>
diff --git a/output/docs/0.6.1/search.json b/output/docs/0.6.1/search.json
index 92941a2..2082274 100644
--- a/output/docs/0.6.1/search.json
+++ b/output/docs/0.6.1/search.json
@@ -1 +1 @@
-[{"categories":null,"content":"Paimon Format Paimon format refers to Apache
Paimon table. Paimon is a streaming data lake platform with high-speed data
ingestion, changelog tracking and efficient real-time analytics.\nBy
registering Paimon’s catalog with Amoro, users can view information such as
Schema, Options, Files, Snapshots, DDLs, Compaction information, and more for
Paimon tables. Furthermore, they can operate on Paimon tables using Spark SQL
in the Terminal. The current supported [...]
\ No newline at end of file
+[{"categories":null,"content":"Paimon Format Paimon format refers to Apache
Paimon table. Paimon is a streaming data lake platform with high-speed data
ingestion, changelog tracking and efficient real-time analytics.\nBy
registering Paimon’s catalog with Amoro, users can view information such as
Schema, Options, Files, Snapshots, DDLs, Compaction information, and more for
Paimon tables. Furthermore, they can operate on Paimon tables using Spark SQL
in the Terminal. The current supported [...]
\ No newline at end of file