This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4015fa03b [flink] flink web ui add write-only tag to writer(which is 
write only) (#1623)
4015fa03b is described below

commit 4015fa03b7be942fc0f120c1919663620f475c3b
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jul 21 19:05:07 2023 +0800

    [flink] flink web ui add write-only tag to writer(which is write only) 
(#1623)
---
 .../src/main/java/org/apache/paimon/flink/sink/FlinkSink.java       | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 3ad1cc0a7..1f586aefa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -56,6 +56,7 @@ public abstract class FlinkSink<T> implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private static final String WRITER_NAME = "Writer";
+    private static final String WRITER_WRITE_ONLY_NAME = "Writer(write-only)";
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
     protected final FileStoreTable table;
@@ -145,9 +146,12 @@ public abstract class FlinkSink<T> implements Serializable 
{
                                 .get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
+        Boolean writeOnly = table.coreOptions().writeOnly();
         SingleOutputStreamOperator<Committable> written =
                 input.transform(
-                                WRITER_NAME + " -> " + table.name(),
+                                (writeOnly ? WRITER_WRITE_ONLY_NAME : 
WRITER_NAME)
+                                        + " -> "
+                                        + table.name(),
                                 new CommittableTypeInfo(),
                                 createWriteOperator(
                                         
createWriteProvider(env.getCheckpointConfig(), isStreaming),

Reply via email to