This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 84babb66a [core] Add numWriters metrics (#3234)
84babb66a is described below
commit 84babb66ab78d06808399ec5dfee892b14125dae
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 16:06:48 2024 +0800
[core] Add numWriters metrics (#3234)
---
docs/content/maintenance/metrics.md | 5 +
.../paimon/operation/MemoryFileStoreWrite.java | 17 ++
.../operation/metrics/WriterBufferMetric.java | 13 ++
.../paimon/flink/sink/WriterOperatorTest.java | 220 ++++++++++++++-------
4 files changed, 185 insertions(+), 70 deletions(-)
diff --git a/docs/content/maintenance/metrics.md
b/docs/content/maintenance/metrics.md
index 5950c518e..203326c40 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -200,6 +200,11 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
</tr>
</thead>
<tbody>
+ <tr>
+ <td>numWriters</td>
+ <td>Gauge</td>
+ <td>Number of writers in this parallelism.</td>
+ </tr>
<tr>
<td>bufferPreemptCount</td>
<td>Gauge</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index fc5bd926a..514fb92af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
/**
@@ -110,6 +112,7 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
+ " but this is: "
+ writer.getClass());
}
+
if (writeBufferPool == null) {
LOG.debug("Use default heap memory segment pool for write
buffer.");
writeBufferPool =
@@ -119,6 +122,10 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
.addOwners(this::memoryOwners);
}
writeBufferPool.notifyNewOwner((MemoryOwner) writer);
+
+ if (writerBufferMetric != null) {
+ writerBufferMetric.increaseNumWriters();
+ }
}
@Override
@@ -135,6 +142,16 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
}
}
+ @Override
+ public List<CommitMessage> prepareCommit(boolean waitCompaction, long
commitIdentifier)
+ throws Exception {
+ List<CommitMessage> result = super.prepareCommit(waitCompaction,
commitIdentifier);
+ if (writerBufferMetric != null) {
+
writerBufferMetric.setNumWriters(writers.values().stream().mapToInt(Map::size).sum());
+ }
+ return result;
+ }
+
@Override
public void close() throws Exception {
super.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index a89531bc4..d414383aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -22,6 +22,7 @@ import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -29,17 +30,21 @@ import java.util.function.Supplier;
public class WriterBufferMetric {
private static final String GROUP_NAME = "writerBuffer";
+ private static final String NUM_WRITERS = "numWriters";
private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
private static final String USED_WRITE_BUFFER_SIZE =
"usedWriteBufferSizeByte";
private static final String TOTAL_WRITE_BUFFER_SIZE =
"totalWriteBufferSizeByte";
private final MetricGroup metricGroup;
+ private final AtomicInteger numWriters;
public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
MetricRegistry metricRegistry,
String tableName) {
metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+ numWriters = new AtomicInteger(0);
+ metricGroup.gauge(NUM_WRITERS, numWriters::get);
metricGroup.gauge(
BUFFER_PREEMPT_COUNT,
() ->
@@ -62,6 +67,14 @@ public class WriterBufferMetric {
return memoryPoolFactory == null ? -1 :
function.apply(memoryPoolFactory);
}
+ public void increaseNumWriters() {
+ numWriters.incrementAndGet();
+ }
+
+ public void setNumWriters(int x) {
+ numWriters.set(x);
+ }
+
public void close() {
this.metricGroup.close();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index d294dad79..3a8c15571 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -60,6 +60,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -68,10 +70,12 @@ public class WriterOperatorTest {
@TempDir public java.nio.file.Path tempDir;
private Path tablePath;
+ private String commitUser;
@BeforeEach
public void before() {
tablePath = new Path(tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
}
@Test
@@ -111,22 +115,7 @@ public class WriterOperatorTest {
private void testMetricsImpl(FileStoreTable fileStoreTable) throws
Exception {
String tableName = tablePath.getName();
- RowDataStoreWriteOperator operator =
- new RowDataStoreWriteOperator(
- fileStoreTable,
- null,
- (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- false,
- false,
- true,
- memoryPool,
- metricGroup),
- "test");
+ RowDataStoreWriteOperator operator =
getStoreSinkWriteOperator(fileStoreTable);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createHarness(operator);
@@ -188,7 +177,7 @@ public class WriterOperatorTest {
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createHarness(operator);
- TableCommitImpl commit = fileStoreTable.newCommit("test");
+ TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
@@ -251,40 +240,6 @@ public class WriterOperatorTest {
.containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]",
"+I[3, 30, 301]");
}
- private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
- FileStoreTable fileStoreTable, boolean waitCompaction) {
- return new RowDataStoreWriteOperator(
- fileStoreTable,
- null,
- (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
- new AsyncLookupSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- false,
- waitCompaction,
- true,
- memoryPool,
- metricGroup),
- "test");
- }
-
- @SuppressWarnings("unchecked")
- private void commitAll(
- OneInputStreamOperatorTestHarness<InternalRow, Committable>
harness,
- TableCommitImpl commit,
- long commitIdentifier) {
- List<CommitMessage> commitMessages = new ArrayList<>();
- while (!harness.getOutput().isEmpty()) {
- Committable committable =
- ((StreamRecord<Committable>)
harness.getOutput().poll()).getValue();
- assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
- commitMessages.add((CommitMessage)
committable.wrappedCommittable());
- }
- commit.commit(commitIdentifier, commitMessages);
- }
-
@Test
public void testChangelog() throws Exception {
testChangelog(false);
@@ -308,28 +263,11 @@ public class WriterOperatorTest {
FileStoreTable fileStoreTable =
createFileStoreTable(
rowType, Arrays.asList("pt", "k"),
Collections.singletonList("k"), options);
-
- RowDataStoreWriteOperator operator =
- new RowDataStoreWriteOperator(
- fileStoreTable,
- null,
- (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- false,
- false,
- true,
- memoryPool,
- metricGroup),
- "test");
-
+ RowDataStoreWriteOperator operator =
getStoreSinkWriteOperator(fileStoreTable);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createHarness(operator);
- TableCommitImpl commit = fileStoreTable.newCommit("test");
+ TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
@@ -380,6 +318,148 @@ public class WriterOperatorTest {
}
}
+ @Test
+ public void testNumWritersMetric() throws Exception {
+ String tableName = tablePath.getName();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set("bucket", "1");
+ options.set("write-buffer-size", "256 b");
+ options.set("page-size", "32 b");
+
+ FileStoreTable fileStoreTable =
+ createFileStoreTable(
+ rowType,
+ Arrays.asList("pt", "k"),
+ Collections.singletonList("pt"),
+ options);
+ TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+
+ RowDataStoreWriteOperator rowDataStoreWriteOperator =
+ getStoreSinkWriteOperator(fileStoreTable);
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ createHarness(rowDataStoreWriteOperator);
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ harness.setup(serializer);
+ harness.open();
+
+ OperatorMetricGroup metricGroup =
rowDataStoreWriteOperator.getMetricGroup();
+ MetricGroup writerBufferMetricGroup =
+ metricGroup
+ .addGroup("paimon")
+ .addGroup("table", tableName)
+ .addGroup("writerBuffer");
+
+ Gauge<Integer> numWriters =
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"numWriters");
+
+ // write into three partitions
+ harness.processElement(GenericRow.of(1, 10, 100), 1);
+ harness.processElement(GenericRow.of(2, 20, 200), 2);
+ harness.processElement(GenericRow.of(3, 30, 300), 3);
+ assertThat(numWriters.getValue()).isEqualTo(3);
+
+ // commit messages in three partitions, no writer should be cleaned
+ harness.prepareSnapshotPreBarrier(1);
+ harness.snapshot(1, 10);
+ harness.notifyOfCompletedCheckpoint(1);
+ commit.commit(
+ 1,
+ harness.extractOutputValues().stream()
+ .map(c -> (CommitMessage) c.wrappedCommittable())
+ .collect(Collectors.toList()));
+ assertThat(numWriters.getValue()).isEqualTo(3);
+
+ // write into two partitions
+ harness.processElement(GenericRow.of(1, 11, 110), 11);
+ harness.processElement(GenericRow.of(3, 13, 130), 13);
+ // checkpoint has not come yet, so no writer should be cleaned
+ assertThat(numWriters.getValue()).isEqualTo(3);
+
+ // checkpoint comes, note that prepareSnapshotPreBarrier is called
before commit, so writer
+ // of partition 2 is not cleaned, but it should be cleaned in the next
checkpoint
+ harness.prepareSnapshotPreBarrier(2);
+ harness.snapshot(2, 20);
+ harness.notifyOfCompletedCheckpoint(2);
+ commit.commit(
+ 2,
+ harness.extractOutputValues().stream()
+ .map(c -> (CommitMessage) c.wrappedCommittable())
+ .collect(Collectors.toList()));
+ harness.prepareSnapshotPreBarrier(3);
+
+ // writer of partition 2 should be cleaned in this snapshot
+ harness.snapshot(3, 30);
+ harness.notifyOfCompletedCheckpoint(3);
+ assertThat(numWriters.getValue()).isEqualTo(2);
+
+ harness.endInput();
+ harness.close();
+ commit.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Test utils
+ // ------------------------------------------------------------------------
+
+ private RowDataStoreWriteOperator getStoreSinkWriteOperator(FileStoreTable
fileStoreTable) {
+ return new RowDataStoreWriteOperator(
+ fileStoreTable,
+ null,
+ (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
+ new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup),
+ commitUser);
+ }
+
+ private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
+ FileStoreTable fileStoreTable, boolean waitCompaction) {
+ return new RowDataStoreWriteOperator(
+ fileStoreTable,
+ null,
+ (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
+ new AsyncLookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ waitCompaction,
+ true,
+ memoryPool,
+ metricGroup),
+ commitUser);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void commitAll(
+ OneInputStreamOperatorTestHarness<InternalRow, Committable>
harness,
+ TableCommitImpl commit,
+ long commitIdentifier) {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Committable committable =
+ ((StreamRecord<Committable>)
harness.getOutput().poll()).getValue();
+ assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+ commitMessages.add((CommitMessage)
committable.wrappedCommittable());
+ }
+ commit.commit(commitIdentifier, commitMessages);
+ }
+
private FileStoreTable createFileStoreTable(
RowType rowType, List<String> primaryKeys, List<String>
partitionKeys, Options conf)
throws Exception {