This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8418e1511 [Flink] Support numBytesSend metrics for paimon sink (#1739)
8418e1511 is described below
commit 8418e1511f17fbcf4ee632767fd02eae71eaf95d
Author: GuojunLi <[email protected]>
AuthorDate: Wed Aug 9 19:56:04 2023 +0800
[Flink] Support numBytesSend metrics for paimon sink (#1739)
This closes #1739.
---
.../org/apache/paimon/flink/sink/Committer.java | 5 ++-
.../paimon/flink/sink/CommitterOperator.java | 2 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 39 +++++++++++++++++++++-
.../paimon/flink/sink/StoreMultiCommitter.java | 6 ++--
.../paimon/flink/sink/CommitterOperatorTest.java | 28 ++++++++++++++++
5 files changed, 75 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 482e1f098..470bf9e53 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -19,6 +19,8 @@
package org.apache.paimon.flink.sink;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -37,7 +39,8 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
throws IOException;
/** Commits the given {@link GlobalCommitT}. */
- void commit(List<GlobalCommitT> globalCommittables) throws IOException,
InterruptedException;
+ void commit(List<GlobalCommitT> globalCommittables, OperatorIOMetricGroup
metricGroup)
+ throws IOException, InterruptedException;
/**
* Filter out all {@link GlobalCommitT} which have committed, and commit
the remaining {@link
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 66dcd2ce1..c0e9f62e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -154,7 +154,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- committer.commit(committables(headMap));
+ committer.commit(committables(headMap),
getMetricGroup().getIOMetricGroup());
headMap.clear();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index dac073ec7..bf153fac8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,11 +18,17 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -60,9 +66,12 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
}
@Override
- public void commit(List<ManifestCommittable> committables)
+ public void commit(List<ManifestCommittable> committables,
OperatorIOMetricGroup metricGroup)
throws IOException, InterruptedException {
commit.commitMultiple(committables);
+ Tuple2<Long, Long> numBytesAndRecords =
calcDataBytesAndRecordsSend(committables);
+ metricGroup.getNumBytesOutCounter().inc(numBytesAndRecords.f0);
+ metricGroup.getNumRecordsOutCounter().inc(numBytesAndRecords.f1);
}
@Override
@@ -83,4 +92,32 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
public void close() throws Exception {
commit.close();
}
+
+ @VisibleForTesting
+ static Tuple2<Long, Long>
calcDataBytesAndRecordsSend(List<ManifestCommittable> committables) {
+ long bytesSend = 0;
+ long recordsSend = 0;
+ for (ManifestCommittable committable : committables) {
+ List<CommitMessage> commitMessages =
committable.fileCommittables();
+ for (CommitMessage commitMessage : commitMessages) {
+ long dataFileSizeInc =
+ calcTotalFileSize(
+ ((CommitMessageImpl)
commitMessage).newFilesIncrement().newFiles());
+ long dataFileRowCountInc =
+ calcTotalFileRowCount(
+ ((CommitMessageImpl)
commitMessage).newFilesIncrement().newFiles());
+ bytesSend += dataFileSizeInc;
+ recordsSend += dataFileRowCountInc;
+ }
+ }
+ return Tuple2.of(bytesSend, recordsSend);
+ }
+
+ private static long calcTotalFileSize(List<DataFileMeta> files) {
+ return files.stream().mapToLong(f ->
f.fileSize()).reduce(Long::sum).orElse(0);
+ }
+
+ private static long calcTotalFileRowCount(List<DataFileMeta> files) {
+ return files.stream().mapToLong(f ->
f.rowCount()).reduce(Long::sum).orElse(0);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 1239d06ee..4ec386663 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import java.io.IOException;
import java.util.ArrayList;
@@ -84,7 +85,8 @@ public class StoreMultiCommitter
}
@Override
- public void commit(List<WrappedManifestCommittable> committables)
+ public void commit(
+ List<WrappedManifestCommittable> committables,
OperatorIOMetricGroup metricGroup)
throws IOException, InterruptedException {
// key by table id
@@ -94,7 +96,7 @@ public class StoreMultiCommitter
Identifier tableId = entry.getKey();
List<ManifestCommittable> committableList = entry.getValue();
StoreCommitter committer = getStoreCommitter(tableId);
- committer.commit(committableList);
+ committer.commit(committableList, metricGroup);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index a874e87a2..74d8561bd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.utils.SnapshotManager;
@@ -35,6 +36,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -45,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -322,6 +325,31 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
}
+ @Test
+ public void testCalcDataBytesSend() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ StreamTableWrite write = table.newWrite(initialCommitUser);
+ StreamTableCommit commit = table.newCommit(initialCommitUser);
+
+ write.write(GenericRow.of(1, 10L));
+ write.write(GenericRow.of(1, 20L));
+ List<CommitMessage> committable = write.prepareCommit(false, 0);
+
+ commit.commit(0, committable);
+
+ ManifestCommittable manifestCommittable = new ManifestCommittable(0);
+ for (CommitMessage commitMessage : committable) {
+ manifestCommittable.addFileCommittable(commitMessage);
+ }
+ write.close();
+ Tuple2<Long, Long> numBytesAndRecords =
+ StoreCommitter.calcDataBytesAndRecordsSend(
+ new ArrayList<>(Arrays.asList(manifestCommittable)));
+ assertThat(numBytesAndRecords.f0).isEqualTo(275);
+ assertThat(numBytesAndRecords.f1).isEqualTo(2);
+ }
+
// ------------------------------------------------------------------------
// Test utils
// ------------------------------------------------------------------------