This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8418e1511 [Flink] Support numBytesSend metrics for paimon sink (#1739)
8418e1511 is described below

commit 8418e1511f17fbcf4ee632767fd02eae71eaf95d
Author: GuojunLi <[email protected]>
AuthorDate: Wed Aug 9 19:56:04 2023 +0800

    [Flink] Support numBytesSend metrics for paimon sink (#1739)
    
    This closes #1739.
---
 .../org/apache/paimon/flink/sink/Committer.java    |  5 ++-
 .../paimon/flink/sink/CommitterOperator.java       |  2 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   | 39 +++++++++++++++++++++-
 .../paimon/flink/sink/StoreMultiCommitter.java     |  6 ++--
 .../paimon/flink/sink/CommitterOperatorTest.java   | 28 ++++++++++++++++
 5 files changed, 75 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 482e1f098..470bf9e53 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -19,6 +19,8 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -37,7 +39,8 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
             throws IOException;
 
     /** Commits the given {@link GlobalCommitT}. */
-    void commit(List<GlobalCommitT> globalCommittables) throws IOException, 
InterruptedException;
+    void commit(List<GlobalCommitT> globalCommittables, OperatorIOMetricGroup 
metricGroup)
+            throws IOException, InterruptedException;
 
     /**
      * Filter out all {@link GlobalCommitT} which have committed, and commit 
the remaining {@link
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 66dcd2ce1..c0e9f62e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -154,7 +154,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
         NavigableMap<Long, GlobalCommitT> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        committer.commit(committables(headMap));
+        committer.commit(committables(headMap), 
getMetricGroup().getIOMetricGroup());
         headMap.clear();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index dac073ec7..bf153fac8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.TableCommit;
 import org.apache.paimon.table.sink.TableCommitImpl;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,9 +66,12 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     }
 
     @Override
-    public void commit(List<ManifestCommittable> committables)
+    public void commit(List<ManifestCommittable> committables, 
OperatorIOMetricGroup metricGroup)
             throws IOException, InterruptedException {
         commit.commitMultiple(committables);
+        Tuple2<Long, Long> numBytesAndRecords = 
calcDataBytesAndRecordsSend(committables);
+        metricGroup.getNumBytesOutCounter().inc(numBytesAndRecords.f0);
+        metricGroup.getNumRecordsOutCounter().inc(numBytesAndRecords.f1);
     }
 
     @Override
@@ -83,4 +92,32 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     public void close() throws Exception {
         commit.close();
     }
+
+    @VisibleForTesting
+    static Tuple2<Long, Long> 
calcDataBytesAndRecordsSend(List<ManifestCommittable> committables) {
+        long bytesSend = 0;
+        long recordsSend = 0;
+        for (ManifestCommittable committable : committables) {
+            List<CommitMessage> commitMessages = 
committable.fileCommittables();
+            for (CommitMessage commitMessage : commitMessages) {
+                long dataFileSizeInc =
+                        calcTotalFileSize(
+                                ((CommitMessageImpl) 
commitMessage).newFilesIncrement().newFiles());
+                long dataFileRowCountInc =
+                        calcTotalFileRowCount(
+                                ((CommitMessageImpl) 
commitMessage).newFilesIncrement().newFiles());
+                bytesSend += dataFileSizeInc;
+                recordsSend += dataFileRowCountInc;
+            }
+        }
+        return Tuple2.of(bytesSend, recordsSend);
+    }
+
+    private static long calcTotalFileSize(List<DataFileMeta> files) {
+        return files.stream().mapToLong(f -> 
f.fileSize()).reduce(Long::sum).orElse(0);
+    }
+
+    private static long calcTotalFileRowCount(List<DataFileMeta> files) {
+        return files.stream().mapToLong(f -> 
f.rowCount()).reduce(Long::sum).orElse(0);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 1239d06ee..4ec386663 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -84,7 +85,8 @@ public class StoreMultiCommitter
     }
 
     @Override
-    public void commit(List<WrappedManifestCommittable> committables)
+    public void commit(
+            List<WrappedManifestCommittable> committables, 
OperatorIOMetricGroup metricGroup)
             throws IOException, InterruptedException {
 
         // key by table id
@@ -94,7 +96,7 @@ public class StoreMultiCommitter
             Identifier tableId = entry.getKey();
             List<ManifestCommittable> committableList = entry.getValue();
             StoreCommitter committer = getStoreCommitter(tableId);
-            committer.commit(committableList);
+            committer.commit(committableList, metricGroup);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index a874e87a2..74d8561bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.utils.SnapshotManager;
@@ -35,6 +36,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -45,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -322,6 +325,31 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
     }
 
+    @Test
+    public void testCalcDataBytesSend() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        StreamTableWrite write = table.newWrite(initialCommitUser);
+        StreamTableCommit commit = table.newCommit(initialCommitUser);
+
+        write.write(GenericRow.of(1, 10L));
+        write.write(GenericRow.of(1, 20L));
+        List<CommitMessage> committable = write.prepareCommit(false, 0);
+
+        commit.commit(0, committable);
+
+        ManifestCommittable manifestCommittable = new ManifestCommittable(0);
+        for (CommitMessage commitMessage : committable) {
+            manifestCommittable.addFileCommittable(commitMessage);
+        }
+        write.close();
+        Tuple2<Long, Long> numBytesAndRecords =
+                StoreCommitter.calcDataBytesAndRecordsSend(
+                        new ArrayList<>(Arrays.asList(manifestCommittable)));
+        assertThat(numBytesAndRecords.f0).isEqualTo(275);
+        assertThat(numBytesAndRecords.f1).isEqualTo(2);
+    }
+
     // ------------------------------------------------------------------------
     //  Test utils
     // ------------------------------------------------------------------------

Reply via email to