This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 84babb66a [core] Add numWriters metrics (#3234)
84babb66a is described below

commit 84babb66ab78d06808399ec5dfee892b14125dae
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 16:06:48 2024 +0800

    [core] Add numWriters metrics (#3234)
---
 docs/content/maintenance/metrics.md                |   5 +
 .../paimon/operation/MemoryFileStoreWrite.java     |  17 ++
 .../operation/metrics/WriterBufferMetric.java      |  13 ++
 .../paimon/flink/sink/WriterOperatorTest.java      | 220 ++++++++++++++-------
 4 files changed, 185 insertions(+), 70 deletions(-)

diff --git a/docs/content/maintenance/metrics.md 
b/docs/content/maintenance/metrics.md
index 5950c518e..203326c40 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -200,6 +200,11 @@ Below is lists of Paimon built-in metrics. They are 
summarized into types of sca
     </tr>
     </thead>
     <tbody>
+        <tr>
+            <td>numWriters</td>
+            <td>Gauge</td>
+            <td>Number of writers in this parallelism.</td>
+        </tr>
         <tr>
             <td>bufferPreemptCount</td>
             <td>Gauge</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index fc5bd926a..514fb92af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.metrics.WriterBufferMetric;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -110,6 +112,7 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
                             + " but this is: "
                             + writer.getClass());
         }
+
         if (writeBufferPool == null) {
             LOG.debug("Use default heap memory segment pool for write 
buffer.");
             writeBufferPool =
@@ -119,6 +122,10 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
                             .addOwners(this::memoryOwners);
         }
         writeBufferPool.notifyNewOwner((MemoryOwner) writer);
+
+        if (writerBufferMetric != null) {
+            writerBufferMetric.increaseNumWriters();
+        }
     }
 
     @Override
@@ -135,6 +142,16 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
         }
     }
 
+    @Override
+    public List<CommitMessage> prepareCommit(boolean waitCompaction, long 
commitIdentifier)
+            throws Exception {
+        List<CommitMessage> result = super.prepareCommit(waitCompaction, 
commitIdentifier);
+        if (writerBufferMetric != null) {
+            
writerBufferMetric.setNumWriters(writers.values().stream().mapToInt(Map::size).sum());
+        }
+        return result;
+    }
+
     @Override
     public void close() throws Exception {
         super.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index a89531bc4..d414383aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -22,6 +22,7 @@ import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.metrics.MetricGroup;
 import org.apache.paimon.metrics.MetricRegistry;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -29,17 +30,21 @@ import java.util.function.Supplier;
 public class WriterBufferMetric {
 
     private static final String GROUP_NAME = "writerBuffer";
+    private static final String NUM_WRITERS = "numWriters";
     private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
     private static final String USED_WRITE_BUFFER_SIZE = 
"usedWriteBufferSizeByte";
     private static final String TOTAL_WRITE_BUFFER_SIZE = 
"totalWriteBufferSizeByte";
 
     private final MetricGroup metricGroup;
+    private final AtomicInteger numWriters;
 
     public WriterBufferMetric(
             Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
             MetricRegistry metricRegistry,
             String tableName) {
         metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+        numWriters = new AtomicInteger(0);
+        metricGroup.gauge(NUM_WRITERS, numWriters::get);
         metricGroup.gauge(
                 BUFFER_PREEMPT_COUNT,
                 () ->
@@ -62,6 +67,14 @@ public class WriterBufferMetric {
         return memoryPoolFactory == null ? -1 : 
function.apply(memoryPoolFactory);
     }
 
+    public void increaseNumWriters() {
+        numWriters.incrementAndGet();
+    }
+
+    public void setNumWriters(int x) {
+        numWriters.set(x);
+    }
+
     public void close() {
         this.metricGroup.close();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index d294dad79..3a8c15571 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -60,6 +60,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -68,10 +70,12 @@ public class WriterOperatorTest {
 
     @TempDir public java.nio.file.Path tempDir;
     private Path tablePath;
+    private String commitUser;
 
     @BeforeEach
     public void before() {
         tablePath = new Path(tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
     }
 
     @Test
@@ -111,22 +115,7 @@ public class WriterOperatorTest {
 
     private void testMetricsImpl(FileStoreTable fileStoreTable) throws 
Exception {
         String tableName = tablePath.getName();
-        RowDataStoreWriteOperator operator =
-                new RowDataStoreWriteOperator(
-                        fileStoreTable,
-                        null,
-                        (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                                new StoreSinkWriteImpl(
-                                        table,
-                                        commitUser,
-                                        state,
-                                        ioManager,
-                                        false,
-                                        false,
-                                        true,
-                                        memoryPool,
-                                        metricGroup),
-                        "test");
+        RowDataStoreWriteOperator operator = 
getStoreSinkWriteOperator(fileStoreTable);
         OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
                 createHarness(operator);
 
@@ -188,7 +177,7 @@ public class WriterOperatorTest {
         OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
                 createHarness(operator);
 
-        TableCommitImpl commit = fileStoreTable.newCommit("test");
+        TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
 
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
@@ -251,40 +240,6 @@ public class WriterOperatorTest {
                 .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", 
"+I[3, 30, 301]");
     }
 
-    private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
-            FileStoreTable fileStoreTable, boolean waitCompaction) {
-        return new RowDataStoreWriteOperator(
-                fileStoreTable,
-                null,
-                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
-                        new AsyncLookupSinkWrite(
-                                table,
-                                commitUser,
-                                state,
-                                ioManager,
-                                false,
-                                waitCompaction,
-                                true,
-                                memoryPool,
-                                metricGroup),
-                "test");
-    }
-
-    @SuppressWarnings("unchecked")
-    private void commitAll(
-            OneInputStreamOperatorTestHarness<InternalRow, Committable> 
harness,
-            TableCommitImpl commit,
-            long commitIdentifier) {
-        List<CommitMessage> commitMessages = new ArrayList<>();
-        while (!harness.getOutput().isEmpty()) {
-            Committable committable =
-                    ((StreamRecord<Committable>) 
harness.getOutput().poll()).getValue();
-            assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
-            commitMessages.add((CommitMessage) 
committable.wrappedCommittable());
-        }
-        commit.commit(commitIdentifier, commitMessages);
-    }
-
     @Test
     public void testChangelog() throws Exception {
         testChangelog(false);
@@ -308,28 +263,11 @@ public class WriterOperatorTest {
         FileStoreTable fileStoreTable =
                 createFileStoreTable(
                         rowType, Arrays.asList("pt", "k"), 
Collections.singletonList("k"), options);
-
-        RowDataStoreWriteOperator operator =
-                new RowDataStoreWriteOperator(
-                        fileStoreTable,
-                        null,
-                        (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                                new StoreSinkWriteImpl(
-                                        table,
-                                        commitUser,
-                                        state,
-                                        ioManager,
-                                        false,
-                                        false,
-                                        true,
-                                        memoryPool,
-                                        metricGroup),
-                        "test");
-
+        RowDataStoreWriteOperator operator = 
getStoreSinkWriteOperator(fileStoreTable);
         OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
                 createHarness(operator);
 
-        TableCommitImpl commit = fileStoreTable.newCommit("test");
+        TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
 
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
@@ -380,6 +318,148 @@ public class WriterOperatorTest {
         }
     }
 
+    @Test
+    public void testNumWritersMetric() throws Exception {
+        String tableName = tablePath.getName();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set("bucket", "1");
+        options.set("write-buffer-size", "256 b");
+        options.set("page-size", "32 b");
+
+        FileStoreTable fileStoreTable =
+                createFileStoreTable(
+                        rowType,
+                        Arrays.asList("pt", "k"),
+                        Collections.singletonList("pt"),
+                        options);
+        TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+
+        RowDataStoreWriteOperator rowDataStoreWriteOperator =
+                getStoreSinkWriteOperator(fileStoreTable);
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                createHarness(rowDataStoreWriteOperator);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        harness.setup(serializer);
+        harness.open();
+
+        OperatorMetricGroup metricGroup = 
rowDataStoreWriteOperator.getMetricGroup();
+        MetricGroup writerBufferMetricGroup =
+                metricGroup
+                        .addGroup("paimon")
+                        .addGroup("table", tableName)
+                        .addGroup("writerBuffer");
+
+        Gauge<Integer> numWriters =
+                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"numWriters");
+
+        // write into three partitions
+        harness.processElement(GenericRow.of(1, 10, 100), 1);
+        harness.processElement(GenericRow.of(2, 20, 200), 2);
+        harness.processElement(GenericRow.of(3, 30, 300), 3);
+        assertThat(numWriters.getValue()).isEqualTo(3);
+
+        // commit messages in three partitions, no writer should be cleaned
+        harness.prepareSnapshotPreBarrier(1);
+        harness.snapshot(1, 10);
+        harness.notifyOfCompletedCheckpoint(1);
+        commit.commit(
+                1,
+                harness.extractOutputValues().stream()
+                        .map(c -> (CommitMessage) c.wrappedCommittable())
+                        .collect(Collectors.toList()));
+        assertThat(numWriters.getValue()).isEqualTo(3);
+
+        // write into two partitions
+        harness.processElement(GenericRow.of(1, 11, 110), 11);
+        harness.processElement(GenericRow.of(3, 13, 130), 13);
+        // checkpoint has not come yet, so no writer should be cleaned
+        assertThat(numWriters.getValue()).isEqualTo(3);
+
+        // checkpoint comes, note that prepareSnapshotPreBarrier is called 
before commit, so writer
+        // of partition 2 is not cleaned, but it should be cleaned in the next 
checkpoint
+        harness.prepareSnapshotPreBarrier(2);
+        harness.snapshot(2, 20);
+        harness.notifyOfCompletedCheckpoint(2);
+        commit.commit(
+                2,
+                harness.extractOutputValues().stream()
+                        .map(c -> (CommitMessage) c.wrappedCommittable())
+                        .collect(Collectors.toList()));
+        harness.prepareSnapshotPreBarrier(3);
+
+        // writer of partition 2 should be cleaned in this snapshot
+        harness.snapshot(3, 30);
+        harness.notifyOfCompletedCheckpoint(3);
+        assertThat(numWriters.getValue()).isEqualTo(2);
+
+        harness.endInput();
+        harness.close();
+        commit.close();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Test utils
+    // ------------------------------------------------------------------------
+
+    private RowDataStoreWriteOperator getStoreSinkWriteOperator(FileStoreTable 
fileStoreTable) {
+        return new RowDataStoreWriteOperator(
+                fileStoreTable,
+                null,
+                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
+                        new StoreSinkWriteImpl(
+                                table,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                false,
+                                true,
+                                memoryPool,
+                                metricGroup),
+                commitUser);
+    }
+
+    private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
+            FileStoreTable fileStoreTable, boolean waitCompaction) {
+        return new RowDataStoreWriteOperator(
+                fileStoreTable,
+                null,
+                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
+                        new AsyncLookupSinkWrite(
+                                table,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                waitCompaction,
+                                true,
+                                memoryPool,
+                                metricGroup),
+                commitUser);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void commitAll(
+            OneInputStreamOperatorTestHarness<InternalRow, Committable> 
harness,
+            TableCommitImpl commit,
+            long commitIdentifier) {
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        while (!harness.getOutput().isEmpty()) {
+            Committable committable =
+                    ((StreamRecord<Committable>) 
harness.getOutput().poll()).getValue();
+            assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+            commitMessages.add((CommitMessage) 
committable.wrappedCommittable());
+        }
+        commit.commit(commitIdentifier, commitMessages);
+    }
+
     private FileStoreTable createFileStoreTable(
             RowType rowType, List<String> primaryKeys, List<String> 
partitionKeys, Options conf)
             throws Exception {

Reply via email to