This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/amoro-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2be0296  deploy: 95aa6bf87d3b104bda6a0389947589aa9a47c68d
2be0296 is described below

commit 2be029669fa8a24db2c04398c73701c8ad4416ce
Author: zhoujinsong <[email protected]>
AuthorDate: Fri Aug 2 06:07:37 2024 +0000

    deploy: 95aa6bf87d3b104bda6a0389947589aa9a47c68d
---
 output/docs/0.6.1/engines/index.xml           |   2 +-
 output/docs/0.6.1/flink-datastream/index.html | 348 ++++++++++----------------
 output/docs/0.6.1/index.html                  |   2 +-
 output/docs/0.6.1/index.xml                   |   2 +-
 output/docs/0.6.1/search.json                 |   2 +-
 5 files changed, 136 insertions(+), 220 deletions(-)

diff --git a/output/docs/0.6.1/engines/index.xml 
b/output/docs/0.6.1/engines/index.xml
index 0d4f610..ffa63ce 100644
--- a/output/docs/0.6.1/engines/index.xml
+++ b/output/docs/0.6.1/engines/index.xml
@@ -12,7 +12,7 @@
       <link>https://amoro.apache.org/docs/0.6.1/flink-datastream/</link>
       <pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate>
       <guid>https://amoro.apache.org/docs/0.6.1/flink-datastream/</guid>
-      <description>Flink DataStream Add maven dependency To add a dependency 
on Mixed-format flink connector in Maven, add the following to your 
pom.xml:&#xA;&amp;lt;dependencies&amp;gt; ... &amp;lt;dependency&amp;gt; 
&amp;lt;groupId&amp;gt;com.netease.arctic&amp;lt;/groupId&amp;gt; &amp;lt;!-- 
For example: amoro-flink-1.15 --&amp;gt; 
&amp;lt;artifactId&amp;gt;amoro-flink-${flink.minor-version}&amp;lt;/artifactId&amp;gt;
 &amp;lt;!-- For example: 0.6.1 --&amp;gt; &amp;lt;version&amp;gt;${ [...]
+      <description>Flink DataStream Reading with DataStream Amoro supports 
reading data in Batch or Streaming mode through Java API.&#xA;Batch mode Using 
Batch mode to read the full and incremental data in the 
FileStore.&#xA;Non-primary key tables support reading full data in batch mode, 
snapshot data with a specified snapshot-id or timestamp, and incremental data 
with a specified snapshot interval. The primary key table temporarily only 
supports reading the current full amount and later [...]
     </item>
     <item>
       <title>Flink DDL</title>
diff --git a/output/docs/0.6.1/flink-datastream/index.html 
b/output/docs/0.6.1/flink-datastream/index.html
index 65eacdc..f1ee51e 100644
--- a/output/docs/0.6.1/flink-datastream/index.html
+++ b/output/docs/0.6.1/flink-datastream/index.html
@@ -539,20 +539,7 @@
             
             <div id="content" class="markdown-body">
                 <div class="margin-for-toc"><h1 id="flink-datastream">Flink 
DataStream</h1>
-<h2 id="add-maven-dependency">Add maven dependency</h2>
-<p>To add a dependency on Mixed-format flink connector in Maven, add the 
following to your pom.xml:</p>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-xml" data-lang="xml"><span style="display:flex;"><span><span 
style="color:#f92672">&lt;dependencies&gt;</span>
-</span></span><span style="display:flex;"><span>  ...
-</span></span><span style="display:flex;"><span>  <span 
style="color:#f92672">&lt;dependency&gt;</span>
-</span></span><span style="display:flex;"><span>    <span 
style="color:#f92672">&lt;groupId&gt;</span>com.netease.arctic<span 
style="color:#f92672">&lt;/groupId&gt;</span>
-</span></span><span style="display:flex;"><span>    <span 
style="color:#75715e">&lt;!-- For example: amoro-flink-1.15 --&gt;</span>
-</span></span><span style="display:flex;"><span>    <span 
style="color:#f92672">&lt;artifactId&gt;</span>amoro-flink-${flink.minor-version}<span
 style="color:#f92672">&lt;/artifactId&gt;</span>
-</span></span><span style="display:flex;"><span>    <span 
style="color:#75715e">&lt;!-- For example: 0.6.1 --&gt;</span>
-</span></span><span style="display:flex;"><span>    <span 
style="color:#f92672">&lt;version&gt;</span>${amoro-mixed-format-flink.version}<span
 style="color:#f92672">&lt;/version&gt;</span>
-</span></span><span style="display:flex;"><span>  <span 
style="color:#f92672">&lt;/dependency&gt;</span>
-</span></span><span style="display:flex;"><span>  ...
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">&lt;/dependencies&gt;</span>
-</span></span></code></pre></div><h2 id="reading-with-datastream">Reading with 
DataStream</h2>
+<h2 id="reading-with-datastream">Reading with DataStream</h2>
 <p>Amoro supports reading data in Batch or Streaming mode through Java API.</p>
 <h3 id="batch-mode">Batch mode</h3>
 <p>Using Batch mode to read the full and incremental data in the FileStore.</p>
@@ -560,46 +547,32 @@
 <li>Non-primary key tables support reading full data in batch mode, snapshot 
data with a specified snapshot-id or timestamp, and incremental data with a 
specified snapshot interval.</li>
 <li>The primary key table temporarily only supports reading the current full 
amount and later CDC data.</li>
 </ul>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.table.FlinkSource;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> java.util.HashMap;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> java.util.Map;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> 
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span>    <span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span> 
<span style="color:#66d9ef">void</span> <span 
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span> 
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>        
StreamExecutionEnvironment env <span style="color:#f92672">=</span> 
StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span>        InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableIdentifier 
tableId <span style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
-</span></span><span style="display:flex;"><span>        ArcticTableLoader 
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        Map<span 
style="color:#f92672">&lt;</span>String, String<span 
style="color:#f92672">&gt;</span> properties <span 
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> 
HashMap<span style="color:#f92672">&lt;&gt;</span>();
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Default is true</span>
-</span></span><span style="display:flex;"><span>        properties.<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;streaming&#34;</span>, <span 
style="color:#e6db74">&#34;false&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
batch <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
FlinkSource.<span style="color:#a6e22e">forRowData</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">env</span>(env)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">properties</span>(properties)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// print all data read</span>
-</span></span><span style="display:flex;"><span>        batch.<span 
style="color:#a6e22e">print</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span>        env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Mixed-format table batch read&#34;</span>);
-</span></span><span style="display:flex;"><span>    }
-</span></span><span style="display:flex;"><span>}
+<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span 
style="display:flex;"><span>StreamExecutionEnvironment env <span 
style="color:#f92672">=</span> StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
+</span></span><span style="display:flex;"><span>    InternalCatalogBuilder
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> AmoroTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>Map<span 
style="color:#f92672">&lt;</span>String, String<span 
style="color:#f92672">&gt;</span> properties <span 
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> 
HashMap<span style="color:#f92672">&lt;&gt;</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
 Default is true.</span>
+</span></span><span style="display:flex;"><span>properties.<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;streaming&#34;</span>, <span 
style="color:#e6db74">&#34;false&#34;</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
batch <span style="color:#f92672">=</span> 
+</span></span><span style="display:flex;"><span>    FlinkSource.<span 
style="color:#a6e22e">forRowData</span>()
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">env</span>(env)
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// The primary key table only supports reading the 
current full amount and later CDC data temporarily, without the properties 
parameter .</span>
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">properties</span>(properties)
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
print All data read</span>
+</span></span><span style="display:flex;"><span>batch.<span 
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Batch Read&#34;</span>);
 </span></span></code></pre></div><p>The map properties contain below keys, 
<strong>currently only valid for non-primary key tables</strong>:</p>
 <table>
 <thead>
@@ -652,188 +625,132 @@
 <h3 id="streaming-mode">Streaming mode</h3>
 <p>Amoro supports reading incremental data in FileStore or LogStore through 
Java API in Streaming mode</p>
 <h3 id="streaming-mode-logstore">Streaming mode (LogStore)</h3>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.read.source.log.kafka.LogKafkaSource;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.util.ArcticUtils;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.ArcticTable;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.iceberg.Schema;
+<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span 
style="display:flex;"><span>StreamExecutionEnvironment env <span 
style="color:#f92672">=</span> StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span> 
+</span></span><span style="display:flex;"><span>    InternalCatalogBuilder
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
 </span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> AmoroTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> 
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span>    <span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span> 
<span style="color:#66d9ef">void</span> <span 
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span> 
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>        
StreamExecutionEnvironment env <span style="color:#f92672">=</span> 
StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span>        InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
+</span></span><span style="display:flex;"><span>AmoroTable table <span 
style="color:#f92672">=</span> AmoroUtils.<span 
style="color:#a6e22e">load</span>(tableLoader);
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Read table All fields. If you only read some fields, you can construct the 
schema yourself, for example: </span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Schema userSchema = new Schema(new ArrayList&lt;Types.NestedField&gt;() 
{{</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
  add(Types.NestedField.optional(0, &#34;f_boolean&#34;, 
Types.BooleanType.get()));</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
  add(Types.NestedField.optional(1, &#34;f_int&#34;, 
Types.IntegerType.get()));</span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
}});</span>
+</span></span><span style="display:flex;"><span>Schema schema <span 
style="color:#f92672">=</span> table.<span 
style="color:#a6e22e">schema</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableIdentifier 
tableId <span style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
-</span></span><span style="display:flex;"><span>        ArcticTableLoader 
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
-----------Hidden Kafka--------------</span>
+</span></span><span style="display:flex;"><span>LogKafkaSource source <span 
style="color:#f92672">=</span> LogKafkaSource.<span 
style="color:#a6e22e">builder</span>(schema, table.<span 
style="color:#a6e22e">properties</span>()).<span 
style="color:#a6e22e">build</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        ArcticTable table 
<span style="color:#f92672">=</span> ArcticUtils.<span 
style="color:#a6e22e">loadArcticTable</span>(tableLoader);
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Read table All fields. If you only read some fields, 
you can construct the schema yourself, for example:</span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Schema userSchema = new Schema(new 
ArrayList&lt;Types.NestedField&gt;() {{</span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">//   add(Types.NestedField.optional(0, 
&#34;f_boolean&#34;, Types.BooleanType.get()));</span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">//   add(Types.NestedField.optional(1, &#34;f_int&#34;, 
Types.IntegerType.get()));</span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// }});</span>
-</span></span><span style="display:flex;"><span>        Schema schema <span 
style="color:#f92672">=</span> table.<span 
style="color:#a6e22e">schema</span>();
+</span></span><span style="display:flex;"><span>or
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// -----------Hidden Kafka--------------</span>
-</span></span><span style="display:flex;"><span>        LogKafkaSource source 
<span style="color:#f92672">=</span> LogKafkaSource.<span 
style="color:#a6e22e">builder</span>(schema, table.<span 
style="color:#a6e22e">properties</span>()).<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
-----------Hidden Pulsar--------------</span>
+</span></span><span style="display:flex;"><span>LogPulsarSource source <span 
style="color:#f92672">=</span> LogPulsarSource.<span 
style="color:#a6e22e">builder</span>(schema, table.<span 
style="color:#a6e22e">properties</span>()).<span 
style="color:#a6e22e">build</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
stream <span style="color:#f92672">=</span> env.<span 
style="color:#a6e22e">fromSource</span>(source, WatermarkStrategy.<span 
style="color:#a6e22e">noWatermarks</span>(), <span 
style="color:#e6db74">&#34;Log Source&#34;</span>);
+</span></span><span style="display:flex;"><span>DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
stream <span style="color:#f92672">=</span> env.<span 
style="color:#a6e22e">fromSource</span>(source, WatermarkStrategy.<span 
style="color:#a6e22e">noWatermarks</span>(), <span 
style="color:#e6db74">&#34;Log Source&#34;</span>);
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Print all the read data</span>
-</span></span><span style="display:flex;"><span>        stream.<span 
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Print all the read data</span>
+</span></span><span style="display:flex;"><span>stream.<span 
style="color:#a6e22e">print</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span>        env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Mixed-format table streaming read&#34;</span>);
-</span></span><span style="display:flex;"><span>    }
-</span></span><span style="display:flex;"><span>}
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Stream Read&#34;</span>);
 </span></span></code></pre></div><h3 id="streaming-mode-filestore">Streaming 
mode (FileStore)</h3>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.table.FlinkSource;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> java.util.HashMap;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> java.util.Map;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> 
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span>    <span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span> 
<span style="color:#66d9ef">void</span> <span 
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span> 
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>        
StreamExecutionEnvironment env <span style="color:#f92672">=</span> 
StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span>        InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableIdentifier 
tableId <span style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
-</span></span><span style="display:flex;"><span>        ArcticTableLoader 
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        Map<span 
style="color:#f92672">&lt;</span>String, String<span 
style="color:#f92672">&gt;</span> properties <span 
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> 
HashMap<span style="color:#f92672">&lt;&gt;</span>();
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Default value is true</span>
-</span></span><span style="display:flex;"><span>        properties.<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;streaming&#34;</span>, <span 
style="color:#e6db74">&#34;true&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
stream <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
FlinkSource.<span style="color:#a6e22e">forRowData</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">env</span>(env)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">properties</span>(properties)
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Print all read data</span>
-</span></span><span style="display:flex;"><span>        stream.<span 
style="color:#a6e22e">print</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span>        env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Mixed-format table streaming Read&#34;</span>);
-</span></span><span style="display:flex;"><span>    }
-</span></span><span style="display:flex;"><span>} 
+<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span 
style="display:flex;"><span>StreamExecutionEnvironment env <span 
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>Map<span 
style="color:#f92672">&lt;</span>String, String<span 
style="color:#f92672">&gt;</span> properties <span 
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> 
HashMap<span style="color:#f92672">&lt;&gt;</span>();
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
default is true </span>
+</span></span><span style="display:flex;"><span>properties.<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;streaming&#34;</span>, <span 
style="color:#e6db74">&#34;true&#34;</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
stream <span style="color:#f92672">=</span> 
+</span></span><span style="display:flex;"><span>    FlinkSource.<span 
style="color:#a6e22e">forRowData</span>()
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">env</span>(env)
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// The primary key table only supports reading the 
current full amount and later CDC data for the time being, without the 
properties parameter</span>
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">properties</span>(properties)
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Print All read data </span>
+</span></span><span style="display:flex;"><span>stream.<span 
style="color:#a6e22e">print</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Stream Read&#34;</span>);
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>StreamExecutionEnvironment env 
<span style="color:#f92672">=</span> ...; 
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span> ...; 
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> ...; 
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> ...; 
+</span></span><span style="display:flex;"><span>Map properties <span 
style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> 
HashMap<span style="color:#f92672">&lt;&gt;</span>(); 
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
default is true properties.put(&#34;streaming&#34;, &#34;true&#34;); </span>
+</span></span><span style="display:flex;"><span>DataStream stream <span 
style="color:#f92672">=</span> 
+</span></span><span style="display:flex;"><span>    FlinkSource.<span 
style="color:#a6e22e">forRowData</span>() 
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">env</span>(env) 
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader) 
+</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// The primary key table only supports reading the 
current full amount and later CDC data for the time being, without the 
properties parameter</span>
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">properties</span>(properties) 
+</span></span><span style="display:flex;"><span>        .<span 
style="color:#a6e22e">build</span>(); 
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
print All read data </span>
+</span></span><span style="display:flex;"><span>stream.<span 
style="color:#a6e22e">print</span>(); 
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Submit and execute the task </span>
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Stream Read&#34;</span>); 
 </span></span></code></pre></div><p>DataStream API supports reading primary 
key tables and non-primary key tables. The configuration items supported by 
properties can refer to Querying With SQL <a href="../flink-dml/">chapter Hint 
Option</a></p>
 <h2 id="writing-with-datastream">Writing with DataStream</h2>
 <p>Amoro table supports writing data to LogStore or FileStore through Java 
API</p>
 <h3 id="overwrite-data">Overwrite data</h3>
 <p>Amoro table currently Only supports the existing data in the dynamic 
Overwrite table of the non-primary key table</p>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.write.FlinkSink;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.api.DataTypes;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.api.TableSchema;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> 
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span>    <span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span> 
<span style="color:#66d9ef">void</span> <span 
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span> 
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Build your data stream</span>
-</span></span><span style="display:flex;"><span>        DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
input <span style="color:#f92672">=</span> <span 
style="color:#66d9ef">null</span>;
-</span></span><span style="display:flex;"><span>        
StreamExecutionEnvironment env <span style="color:#f92672">=</span> 
StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span>        InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableIdentifier 
tableId <span style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
-</span></span><span style="display:flex;"><span>        ArcticTableLoader 
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableSchema 
flinkSchema <span style="color:#f92672">=</span> TableSchema.<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;id&#34;</span>, DataTypes.<span 
style="color:#a6e22e">INT</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;name&#34;</span>, DataTypes.<span 
style="color:#a6e22e">STRING</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;op_time&#34;</span>, DataTypes.<span 
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        FlinkSink
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">forRowData</span>(input)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">overwrite</span>(<span style="color:#66d9ef">true</span>)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">flinkSchema</span>(flinkSchema)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">build</span>();
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Submit and execute the task</span>
-</span></span><span style="display:flex;"><span>        env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Mixed-format table overwrite&#34;</span>);
-</span></span><span style="display:flex;"><span>    }
-</span></span><span style="display:flex;"><span>}
+<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span 
style="display:flex;"><span>DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
input <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>TableSchema FLINK_SCHEMA <span 
style="color:#f92672">=</span> TableSchema.<span 
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;id&#34;</span>, DataTypes.<span 
style="color:#a6e22e">INT</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;name&#34;</span>, DataTypes.<span 
style="color:#a6e22e">STRING</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;op_time&#34;</span>, DataTypes.<span 
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span>FlinkSink
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">forRowData</span>(input)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">overwrite</span>(<span style="color:#66d9ef">true</span>)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">flinkSchema</span>(FLINK_SCHEMA)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>
+</span></span><span style="display:flex;"><span><span style="color:#75715e">// 
Submit and execute the task</span>
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Overwrite&#34;</span>);
+</span></span><span style="display:flex;"><span>DataStream input <span 
style="color:#f92672">=</span> ...; InternalCatalogBuilder catalogBuilder <span 
style="color:#f92672">=</span> ...; TableIdentifier tableId <span 
style="color:#f92672">=</span> ...; AmoroTableLoader tableLoader <span 
style="color:#f92672">=</span> ...; TableSchema FLINK_SCHEMA <span 
style="color:#f92672">=</span> TableSchema.<span 
style="color:#a6e22e">builder</span>() .<span 
style="color:#a6e22e">field</span>(<span s [...]
 </span></span></code></pre></div><h3 id="appending-data">Appending data</h3>
 <p>For the Amoro table, it supports specifying to write data to FileStore or 
LogStore through Java API.</p>
-<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.InternalCatalogBuilder;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
com.netease.arctic.flink.table.ArcticTableLoader;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.util.ArcticUtils;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.flink.write.FlinkSink;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.ArcticTable;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> com.netease.arctic.table.TableIdentifier;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.datastream.DataStream;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.api.DataTypes;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.api.TableSchema;
-</span></span><span style="display:flex;"><span><span 
style="color:#f92672">import</span> org.apache.flink.table.data.RowData;
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span><span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> 
<span style="color:#a6e22e">Main</span> {
-</span></span><span style="display:flex;"><span>    <span 
style="color:#66d9ef">public</span> <span style="color:#66d9ef">static</span> 
<span style="color:#66d9ef">void</span> <span 
style="color:#a6e22e">main</span>(String<span style="color:#f92672">[]</span> 
args) <span style="color:#66d9ef">throws</span> Exception {
-</span></span><span style="display:flex;"><span>        <span 
style="color:#75715e">// Build your data stream</span>
-</span></span><span style="display:flex;"><span>        DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
input <span style="color:#f92672">=</span> <span 
style="color:#66d9ef">null</span>;
-</span></span><span style="display:flex;"><span>        
StreamExecutionEnvironment env <span style="color:#f92672">=</span> 
StreamExecutionEnvironment.<span 
style="color:#a6e22e">createLocalEnvironment</span>();
-</span></span><span style="display:flex;"><span>        InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span>
-</span></span><span style="display:flex;"><span>                
InternalCatalogBuilder
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                        .<span 
style="color:#a6e22e">metastoreUrl</span>(<span 
style="color:#e6db74">&#34;thrift://&lt;url&gt;:&lt;port&gt;/&lt;catalog_name&gt;&#34;</span>);
-</span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableIdentifier 
tableId <span style="color:#f92672">=</span> TableIdentifier.<span 
style="color:#a6e22e">of</span>(<span 
style="color:#e6db74">&#34;catalog_name&#34;</span>, <span 
style="color:#e6db74">&#34;database_name&#34;</span>, <span 
style="color:#e6db74">&#34;test_table&#34;</span>);
-</span></span><span style="display:flex;"><span>        ArcticTableLoader 
tableLoader <span style="color:#f92672">=</span> ArcticTableLoader.<span 
style="color:#a6e22e">of</span>(tableId, catalogBuilder);
+<div class="highlight"><pre tabindex="0" 
style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code
 class="language-java" data-lang="java"><span 
style="display:flex;"><span>DataStream<span 
style="color:#f92672">&lt;</span>RowData<span style="color:#f92672">&gt;</span> 
input <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>InternalCatalogBuilder 
catalogBuilder <span style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>TableIdentifier tableId <span 
style="color:#f92672">=</span> ...;
+</span></span><span style="display:flex;"><span>AmoroTableLoader tableLoader 
<span style="color:#f92672">=</span> ...;
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        TableSchema 
flinkSchema <span style="color:#f92672">=</span> TableSchema.<span 
style="color:#a6e22e">builder</span>()
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;id&#34;</span>, DataTypes.<span 
style="color:#a6e22e">INT</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;name&#34;</span>, DataTypes.<span 
style="color:#a6e22e">STRING</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;op_time&#34;</span>, DataTypes.<span 
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>TableSchema FLINK_SCHEMA <span 
style="color:#f92672">=</span> TableSchema.<span 
style="color:#a6e22e">builder</span>()
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;id&#34;</span>, DataTypes.<span 
style="color:#a6e22e">INT</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;name&#34;</span>, DataTypes.<span 
style="color:#a6e22e">STRING</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">field</span>(<span 
style="color:#e6db74">&#34;op_time&#34;</span>, DataTypes.<span 
style="color:#a6e22e">TIMESTAMP_WITH_LOCAL_TIME_ZONE</span>())
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">build</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        ArcticTable table 
<span style="color:#f92672">=</span> ArcticUtils.<span 
style="color:#a6e22e">loadArcticTable</span>(tableLoader);
+</span></span><span style="display:flex;"><span>AmoroTable table <span 
style="color:#f92672">=</span> AmoroUtils.<span 
style="color:#a6e22e">loadAmoroTable</span>(tableLoader);
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        table.<span 
style="color:#a6e22e">properties</span>().<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;arctic.emit.mode&#34;</span>, <span 
style="color:#e6db74">&#34;log,file&#34;</span>);
+</span></span><span style="display:flex;"><span>table.<span 
style="color:#a6e22e">properties</span>().<span 
style="color:#a6e22e">put</span>(<span 
style="color:#e6db74">&#34;arctic.emit.mode&#34;</span>, <span 
style="color:#e6db74">&#34;log,file&#34;</span>);
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        FlinkSink
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">forRowData</span>(input)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">table</span>(table)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">flinkSchema</span>(flinkSchema)
-</span></span><span style="display:flex;"><span>                .<span 
style="color:#a6e22e">build</span>();
+</span></span><span style="display:flex;"><span>FlinkSink
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">forRowData</span>(input)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">table</span>(table)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">tableLoader</span>(tableLoader)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">flinkSchema</span>(FLINK_SCHEMA)
+</span></span><span style="display:flex;"><span>    .<span 
style="color:#a6e22e">build</span>();
 </span></span><span style="display:flex;"><span>
-</span></span><span style="display:flex;"><span>        env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Mixed-format table append&#34;</span>);
-</span></span><span style="display:flex;"><span>    }
-</span></span><span style="display:flex;"><span>}
+</span></span><span style="display:flex;"><span>env.<span 
style="color:#a6e22e">execute</span>(<span style="color:#e6db74">&#34;Test 
Amoro Append&#34;</span>);
 </span></span></code></pre></div><p>The DataStream API supports writing to 
primary key tables and non-primary key tables. The configuration items 
supported by properties can refer to Writing With SQL <a 
href="../flink-dml/">chapter Hint Options</a></p>
 <blockquote>
 <p><strong>TIPS</strong></p>
@@ -861,7 +778,6 @@
     <div id="full">
         <nav id="TableOfContents">
   <ul>
-    <li><a href="#add-maven-dependency">Add maven dependency</a></li>
     <li><a href="#reading-with-datastream">Reading with DataStream</a>
       <ul>
         <li><a href="#batch-mode">Batch mode</a></li>
diff --git a/output/docs/0.6.1/index.html b/output/docs/0.6.1/index.html
index 7b7d037..286741c 100644
--- a/output/docs/0.6.1/index.html
+++ b/output/docs/0.6.1/index.html
@@ -32,7 +32,7 @@
 <!DOCTYPE html>
 <html>
 <head>
-       <meta name="generator" content="Hugo 0.127.0">
+       <meta name="generator" content="Hugo 0.130.0">
 
     <meta charset="utf-8">
     <meta http-equiv="X-UA-Compatible" content="IE=edge">
diff --git a/output/docs/0.6.1/index.xml b/output/docs/0.6.1/index.xml
index a68296f..eb815ce 100644
--- a/output/docs/0.6.1/index.xml
+++ b/output/docs/0.6.1/index.xml
@@ -61,7 +61,7 @@
       <link>https://amoro.apache.org/docs/0.6.1/flink-datastream/</link>
       <pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate>
       <guid>https://amoro.apache.org/docs/0.6.1/flink-datastream/</guid>
-      <description>Flink DataStream Add maven dependency To add a dependency 
on Mixed-format flink connector in Maven, add the following to your 
pom.xml:&#xA;&amp;lt;dependencies&amp;gt; ... &amp;lt;dependency&amp;gt; 
&amp;lt;groupId&amp;gt;com.netease.arctic&amp;lt;/groupId&amp;gt; &amp;lt;!-- 
For example: amoro-flink-1.15 --&amp;gt; 
&amp;lt;artifactId&amp;gt;amoro-flink-${flink.minor-version}&amp;lt;/artifactId&amp;gt;
 &amp;lt;!-- For example: 0.6.1 --&amp;gt; &amp;lt;version&amp;gt;${ [...]
+      <description>Flink DataStream Reading with DataStream Amoro supports 
reading data in Batch or Streaming mode through Java API.&#xA;Batch mode Using 
Batch mode to read the full and incremental data in the 
FileStore.&#xA;Non-primary key tables support reading full data in batch mode, 
snapshot data with a specified snapshot-id or timestamp, and incremental data 
with a specified snapshot interval. The primary key table temporarily only 
supports reading the current full amount and later [...]
     </item>
     <item>
       <title>Flink DDL</title>
diff --git a/output/docs/0.6.1/search.json b/output/docs/0.6.1/search.json
index 92941a2..2082274 100644
--- a/output/docs/0.6.1/search.json
+++ b/output/docs/0.6.1/search.json
@@ -1 +1 @@
-[{"categories":null,"content":"Paimon Format Paimon format refers to Apache 
Paimon table. Paimon is a streaming data lake platform with high-speed data 
ingestion, changelog tracking and efficient real-time analytics.\nBy 
registering Paimon’s catalog with Amoro, users can view information such as 
Schema, Options, Files, Snapshots, DDLs, Compaction information, and more for 
Paimon tables. Furthermore, they can operate on Paimon tables using Spark SQL 
in the Terminal. The current supported  [...]
\ No newline at end of file
+[{"categories":null,"content":"Paimon Format Paimon format refers to Apache 
Paimon table. Paimon is a streaming data lake platform with high-speed data 
ingestion, changelog tracking and efficient real-time analytics.\nBy 
registering Paimon’s catalog with Amoro, users can view information such as 
Schema, Options, Files, Snapshots, DDLs, Compaction information, and more for 
Paimon tables. Furthermore, they can operate on Paimon tables using Spark SQL 
in the Terminal. The current supported  [...]
\ No newline at end of file

Reply via email to