This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b394ff531 [core] Register compaction metrics to flink (#2282)
b394ff531 is described below
commit b394ff5311c47e8f95e03e99ad3bae2a8ad514ca
Author: GuojunLi <[email protected]>
AuthorDate: Fri Nov 24 12:08:41 2023 +0800
[core] Register compaction metrics to flink (#2282)
This closes #2282.
---
.../cdc/CdcDynamicBucketWriteOperatorTest.java | 193 ++++++++++++++
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 88 +++++++
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 75 ++++++
.../paimon/flink/sink/StoreSinkWriteImpl.java | 2 +-
.../paimon/flink/sink/TableWriteOperator.java | 5 +
.../paimon/flink/sink/CompactorSinkITCase.java | 289 +++++++++++++++++++++
.../apache/paimon/flink/sink/FlinkSinkTest.java | 204 +++++++++++++++
7 files changed, 855 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
new file mode 100644
index 000000000..c72a1f637
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CdcRecordStoreWriteOperator}. */
+public class CdcDynamicBucketWriteOperatorTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Path tablePath;
+ private String commitUser;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @AfterEach
+ public void after() {
+ // assert all connections are closed
+ Predicate<Path> pathPredicate = path ->
path.toString().contains(tempDir.toString());
+ assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+ assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ @Test
+ public void testCompactionMetrics() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()},
+ new String[] {"pk", "col1"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("pk"));
+ OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable> harness =
+ createTestHarness(table);
+ CdcDynamicBucketWriteOperator operator =
+ (CdcDynamicBucketWriteOperator) harness.getOneInputOperator();
+ harness.open();
+
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+
+ long timestamp = 0;
+ long cpId = 1L;
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pk", "1");
+ fields.put("col1", "2");
+ harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT,
fields), 0), timestamp++);
+ operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.getWrite().prepareCommit(true, cpId++);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ fields.put("pk", "1");
+ fields.put("col1", "3");
+ harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT,
fields), 0), timestamp);
+ operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.getWrite().prepareCommit(true, cpId);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ harness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
+ private OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable>
+ createTestHarness(FileStoreTable table) throws Exception {
+ CdcDynamicBucketWriteOperator operator =
+ new CdcDynamicBucketWriteOperator(
+ table,
+ (t, commitUser, state, ioManager, memoryPool,
metricGroup) ->
+ new StoreSinkWriteImpl(
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup),
+ commitUser);
+ TypeSerializer<Tuple2<CdcRecord, Integer>> inputSerializer = new
JavaSerializer<>();
+ TypeSerializer<Committable> outputSerializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator,
inputSerializer);
+ harness.setup(outputSerializer);
+ return harness;
+ }
+
+ private FileStoreTable createFileStoreTable(
+ RowType rowType, List<String> partitions, List<String>
primaryKeys) throws Exception {
+ Options conf = new Options();
+ conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(rowType.getFields(), partitions,
primaryKeys, conf.toMap(), ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index ce7d76287..7d6eedf0b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -43,6 +44,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -67,6 +69,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link CdcRecordStoreMultiWriteOperator}. */
@@ -702,6 +705,91 @@ public class CdcRecordStoreMultiWriteOperatorTest {
harness.close();
}
+ @Test
+ public void testSingleTableCompactionMetrics() throws Exception {
+ Identifier tableId = firstTable;
+ FileStoreTable table = (FileStoreTable) catalog.getTable(tableId);
+
+ OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable> testHarness =
+ createTestHarness(catalogLoader);
+
+ testHarness.open();
+
+ CdcRecordStoreMultiWriteOperator operator =
+ (CdcRecordStoreMultiWriteOperator)
testHarness.getOneInputOperator();
+
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "pt=0")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+
+ long cpId = 1L;
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pt", "0");
+ fields.put("k", "1");
+ fields.put("v", "10");
+
+ CdcMultiplexRecord record =
+ CdcMultiplexRecord.fromCdcRecord(
+ databaseName,
+ tableId.getObjectName(),
+ new CdcRecord(RowKind.INSERT, fields));
+
+ testHarness.processElement(record, 0);
+ operator.writes().get(tableId).compact(row(0), 0, true);
+ operator.writes().get(tableId).prepareCommit(true, cpId++);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ fields.put("pt", "0");
+ fields.put("k", "2");
+ fields.put("v", "12");
+
+ CdcMultiplexRecord record1 =
+ CdcMultiplexRecord.fromCdcRecord(
+ databaseName,
+ tableId.getObjectName(),
+ new CdcRecord(RowKind.INSERT, fields));
+
+ testHarness.processElement(record1, 1);
+ operator.writes().get(tableId).compact(row(0), 0, true);
+ operator.writes().get(tableId).prepareCommit(true, cpId);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ testHarness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
private OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable>
createTestHarness(Catalog.Loader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator operator =
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 3a30af204..87538e047 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
@@ -39,6 +41,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.AfterEach;
@@ -250,6 +253,78 @@ public class CdcRecordStoreWriteOperatorTest {
harness.close();
}
+ @Test
+ public void testCompactionMetrics() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()},
+ new String[] {"pk", "col1"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("pk"));
+ OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+ createTestHarness(table);
+ CdcRecordStoreWriteOperator operator =
+ (CdcRecordStoreWriteOperator) harness.getOneInputOperator();
+ harness.open();
+
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+
+ long timestamp = 0;
+ long cpId = 1L;
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pk", "1");
+ fields.put("col1", "2");
+ harness.processElement(new CdcRecord(RowKind.INSERT, fields),
timestamp++);
+ operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.getWrite().prepareCommit(true, cpId++);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ fields.put("pk", "1");
+ fields.put("col1", "3");
+ harness.processElement(new CdcRecord(RowKind.INSERT, fields),
timestamp);
+ operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.getWrite().prepareCommit(true, cpId);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ harness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
private OneInputStreamOperatorTestHarness<CdcRecord, Committable>
createTestHarness(
FileStoreTable table) throws Exception {
CdcRecordStoreWriteOperator operator =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index b4c6bf06f..5cd72dc2f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -63,7 +63,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected TableWriteImpl<?> write;
- private MetricGroup metricGroup;
+ @Nullable private final MetricGroup metricGroup;
public StoreSinkWriteImpl(
FileStoreTable table,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 74303fd42..004fad293 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -119,4 +119,9 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
throws IOException {
return write.prepareCommit(waitCompaction, checkpointId);
}
+
+ @VisibleForTesting
+ public StoreSinkWrite getWrite() {
+ return write;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index b9571f6b3..f2b8960b0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -19,13 +19,24 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -41,14 +52,22 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TraceableFileIO;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Arrays;
@@ -59,6 +78,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -167,6 +187,188 @@ public class CompactorSinkITCase extends AbstractTestBase
{
.isEqualTo(sinkParalellism);
}
+ @Test
+ public void testCompactionMetrics() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ prepareDataFile(table);
+
+ StoreCompactOperator operator = createCompactOperator(table);
+ OneInputStreamOperatorTestHarness<RowData, Committable> testHarness =
+ createTestHarness(operator);
+ testHarness.open();
+
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "dt=20221208_hh=15")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ DataFileMetaSerializer fileMetaSerializer = new
DataFileMetaSerializer();
+ RowData record =
+ new FlinkRowData(
+ GenericRow.of(
+ 1L,
+ partition("20221208", 15),
+ 0,
+
fileMetaSerializer.serializeList(Collections.emptyList())));
+
+ long timestamp = 0;
+ testHarness.processElement(record, timestamp++);
+ testHarness.endInput();
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "dt=20221208_hh=16")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ record =
+ new FlinkRowData(
+ GenericRow.of(
+ 2L,
+ partition("20221208", 16),
+ 0,
+
fileMetaSerializer.serializeList(Collections.emptyList())));
+
+ testHarness.processElement(record, timestamp);
+ testHarness.endInput();
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ testHarness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
+ @Test
+ public void testMultiTablesCompactionMetrics(@TempDir java.nio.file.Path
tempDir)
+ throws Exception {
+ Path warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempDir);
+ Options catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+ catalogOptions.set(CatalogOptions.URI, "");
+ Catalog.Loader catalogLoader =
+ () ->
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+ Catalog catalog = catalogLoader.load();
+ String databaseName = "test_db";
+ catalog.createDatabase(databaseName, true);
+ Identifier firstTableId = Identifier.create(databaseName,
"test_table1");
+ Identifier secondTableId = Identifier.create(databaseName,
"test_table2");
+ FileStoreTable firstTable = createCatalogTable(catalog, firstTableId);
+ FileStoreTable secondTable = createCatalogTable(catalog,
secondTableId);
+ prepareDataFile(firstTable);
+ prepareDataFile(secondTable);
+
+ MultiTablesStoreCompactOperator operator =
createMultiTablesCompactOperator(catalogLoader);
+ OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
testHarness =
+ createMultiTablesTestHarness(operator);
+ testHarness.open();
+
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", firstTable.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ DataFileMetaSerializer fileMetaSerializer = new
DataFileMetaSerializer();
+ RowData record =
+ new FlinkRowData(
+ GenericRow.of(
+ 1L,
+ serializeBinaryRow(BinaryRow.EMPTY_ROW),
+ 0,
+
fileMetaSerializer.serializeList(Collections.emptyList()),
+ BinaryString.fromString(databaseName),
+
BinaryString.fromString(firstTableId.getObjectName())));
+
+ long timestamp = 0;
+ testHarness.processElement(record, timestamp++);
+ testHarness.endInput();
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", secondTable.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ record =
+ new FlinkRowData(
+ GenericRow.of(
+ 2L,
+ serializeBinaryRow(BinaryRow.EMPTY_ROW),
+ 0,
+
fileMetaSerializer.serializeList(Collections.emptyList()),
+ BinaryString.fromString(databaseName),
+
BinaryString.fromString(secondTableId.getObjectName())));
+
+ testHarness.processElement(record, timestamp);
+ testHarness.endInput();
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ testHarness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
private List<Map<String, String>> getSpecifiedPartitions() {
Map<String, String> partition1 = new HashMap<>();
partition1.put("dt", "20221208");
@@ -195,4 +397,91 @@ public class CompactorSinkITCase extends AbstractTestBase {
""));
return FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
}
+
+ private FileStoreTable createCatalogTable(Catalog catalog, Identifier
tableIdentifier)
+ throws Exception {
+ Schema tableSchema =
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdentifier, tableSchema, false);
+ return (FileStoreTable) catalog.getTable(tableIdentifier);
+ }
+
+ private OneInputStreamOperatorTestHarness<RowData, Committable>
createTestHarness(
+ OneInputStreamOperator<RowData, Committable> operator) throws
Exception {
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ return harness;
+ }
+
+ private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
+ createMultiTablesTestHarness(
+ OneInputStreamOperator<RowData, MultiTableCommittable>
operator)
+ throws Exception {
+ TypeSerializer<MultiTableCommittable> serializer =
+ new MultiTableCommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ return harness;
+ }
+
+ protected StoreCompactOperator createCompactOperator(FileStoreTable table)
{
+ return new StoreCompactOperator(
+ table,
+ (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ new StoreSinkWriteImpl(
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ false,
+ memoryPool,
+ metricGroup),
+ "test");
+ }
+
+ protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
+ Catalog.Loader catalogLoader) throws Exception {
+ return new MultiTablesStoreCompactOperator(
+ catalogLoader, commitUser, new CheckpointConfig(), false,
false, new Options());
+ }
+
+ private static byte[] partition(String dt, int hh) {
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, BinaryString.fromString(dt));
+ writer.writeInt(1, hh);
+ writer.complete();
+ return serializeBinaryRow(row);
+ }
+
+ private void prepareDataFile(FileStoreTable table) throws Exception {
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+ write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
+ write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
+ write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
+ write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
+ write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.close();
+ commit.close();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index a93a361dd..16c0e2814 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -19,8 +19,10 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
@@ -37,23 +39,32 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -78,6 +89,142 @@ public class FlinkSinkTest {
assertThat(testSpillable(streamExecutionEnvironment,
fileStoreTable)).isFalse();
}
+ @Test
+ public void testCompactionMetrics() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ RowDataStoreWriteOperator operator = createWriteOperator(table);
+ OneInputStreamOperatorTestHarness<InternalRow, Committable>
testHarness =
+ createTestHarness(operator);
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ testHarness.open();
+
+ final GenericRow row1 = GenericRow.of(1, 2);
+ final GenericRow row2 = GenericRow.of(2, 3);
+ final GenericRow row3 = GenericRow.of(3, 4);
+ final GenericRow row4 = GenericRow.of(4, 5);
+
+ List<StreamRecord<InternalRow>> streamRecords = new ArrayList<>();
+ streamRecords.add(new StreamRecord<>(row1));
+ streamRecords.add(new StreamRecord<>(row2));
+ streamRecords.add(new StreamRecord<>(row3));
+
+ long cpId = 1L;
+ testHarness.processElements(streamRecords);
+ operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.write.prepareCommit(true, cpId++);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ testHarness.processElement(row4, 0);
+ operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.write.prepareCommit(true, cpId);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ testHarness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
+ @Test
+ public void testDynamicBucketCompactionMetrics() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ DynamicBucketRowWriteOperator operator =
createDynamicBucketWriteOperator(table);
+ OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>,
Committable> testHarness =
+ createDynamicBucketTestHarness(operator);
+ MetricGroup compactionMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("compaction");
+ testHarness.open();
+
+ final GenericRow row1 = GenericRow.of(1, 2);
+ final GenericRow row2 = GenericRow.of(2, 3);
+ final GenericRow row3 = GenericRow.of(3, 4);
+ final GenericRow row4 = GenericRow.of(4, 5);
+
+ List<StreamRecord<Tuple2<InternalRow, Integer>>> streamRecords = new
ArrayList<>();
+ streamRecords.add(new StreamRecord<>(Tuple2.of(row1, 0)));
+ streamRecords.add(new StreamRecord<>(Tuple2.of(row2, 1)));
+ streamRecords.add(new StreamRecord<>(Tuple2.of(row3, 2)));
+
+ long cpId = 1L;
+ testHarness.processElements(streamRecords);
+ operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.write.prepareCommit(true, cpId++);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ testHarness.processElement(Tuple2.of(row4, 0), 0);
+ operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ operator.write.prepareCommit(true, cpId);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
+ .getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
+ .getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ // operator closed, metric groups should be unregistered
+ testHarness.close();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
+ .isNull();
+ assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
+ .isNull();
+ }
+
private boolean testSpillable(
StreamExecutionEnvironment streamExecutionEnvironment,
FileStoreTable fileStoreTable)
throws Exception {
@@ -134,4 +281,61 @@ public class FlinkSinkTest {
conf,
new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
+
+ private OneInputStreamOperatorTestHarness<InternalRow, Committable>
createTestHarness(
+ OneInputStreamOperator<InternalRow, Committable> operator) throws
Exception {
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ return harness;
+ }
+
+ private OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>,
Committable>
+ createDynamicBucketTestHarness(
+ OneInputStreamOperator<Tuple2<InternalRow, Integer>,
Committable> operator)
+ throws Exception {
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>,
Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ return harness;
+ }
+
+ protected RowDataStoreWriteOperator createWriteOperator(FileStoreTable
table) {
+ return new RowDataStoreWriteOperator(
+ table,
+ null,
+ (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ new StoreSinkWriteImpl(
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup),
+ "test");
+ }
+
+ protected DynamicBucketRowWriteOperator
createDynamicBucketWriteOperator(FileStoreTable table) {
+ return new DynamicBucketRowWriteOperator(
+ table,
+ (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ new StoreSinkWriteImpl(
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup),
+ "test");
+ }
}