This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push:
new 161b615 [FLINK-33493][connectors/elasticsearch] Fix
ElasticsearchWriterITCase test
161b615 is described below
commit 161b6153cad0e59cc05b45596fe70cd4d53b2fd0
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Nov 10 10:43:11 2023 +0800
[FLINK-33493][connectors/elasticsearch] Fix ElasticsearchWriterITCase test
---
.../sink/ElasticsearchWriterITCase.java | 51 +++++--
.../sink/TestingSinkWriterMetricGroup.java | 163 +++++++++++++++++++++
2 files changed, 205 insertions(+), 9 deletions(-)
diff --git
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index bd020fb..e8002cc 100644
---
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -24,11 +24,11 @@ import
org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -69,6 +69,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import static
org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE;
import static
org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage;
@@ -193,15 +194,16 @@ class ElasticsearchWriterITCase {
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
- final InternalSinkWriterMetricGroup metricGroup =
- InternalSinkWriterMetricGroup.mock(
- metricListener.getMetricGroup(),
operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1,
FlushBackoffType.NONE, 0, 0);
try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
- createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+ createWriter(
+ index,
+ false,
+ bulkProcessorConfig,
+ getSinkWriterMetricGroup(operatorIOMetricGroup))) {
final Counter numBytesOut =
operatorIOMetricGroup.getNumBytesOutCounter();
assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
@@ -267,10 +269,7 @@ class ElasticsearchWriterITCase {
private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
String index, boolean flushOnCheckpoint, BulkProcessorConfig
bulkProcessorConfig) {
return createWriter(
- index,
- flushOnCheckpoint,
- bulkProcessorConfig,
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+ index, flushOnCheckpoint, bulkProcessorConfig,
getSinkWriterMetricGroup());
}
private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
@@ -289,6 +288,40 @@ class ElasticsearchWriterITCase {
new TestMailbox());
}
+ private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
+ final OperatorIOMetricGroup operatorIOMetricGroup =
+
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+ return getSinkWriterMetricGroup(operatorIOMetricGroup);
+ }
+
+ private TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
+ OperatorIOMetricGroup operatorIOMetricGroup) {
+ MetricGroup parentMetricGroup = metricListener.getMetricGroup();
+ Counter numRecordsOutErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+ Counter numRecordsSendErrors =
+ parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS,
numRecordsOutErrors);
+ Counter numRecordsWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_RECORDS_SEND,
+ operatorIOMetricGroup.getNumRecordsOutCounter());
+ Counter numBytesWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_BYTES_SEND,
operatorIOMetricGroup.getNumBytesOutCounter());
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
+ currentSendTimeGauge ->
+ parentMetricGroup.gauge(
+ MetricNames.CURRENT_SEND_TIME,
currentSendTimeGauge);
+ return new TestingSinkWriterMetricGroup.Builder()
+ .setParentMetricGroup(parentMetricGroup)
+ .setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
+ .setNumRecordsOutErrorsCounterSupplier(() ->
numRecordsOutErrors)
+ .setNumRecordsSendErrorsCounterSupplier(() ->
numRecordsSendErrors)
+ .setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
+ .setNumBytesSendCounterSupplier(() -> numBytesWritten)
+ .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
+ .build();
+ }
+
private static class TestBulkProcessorBuilderFactory implements
BulkProcessorBuilderFactory {
@Override
public BulkProcessor.Builder apply(
diff --git
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
new file mode 100644
index 0000000..b122d66
--- /dev/null
+++
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Testing implementation for {@link SinkWriterMetricGroup}. */
+public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
+ implements SinkWriterMetricGroup {
+
+ private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendCounterSupplier;
+
+ private final Supplier<Counter> numBytesSendCounterSupplier;
+
+ private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;
+
+ private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;
+
+ public TestingSinkWriterMetricGroup(
+ MetricGroup parentMetricGroup,
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendCounterSupplier,
+ Supplier<Counter> numBytesSendCounterSupplier,
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ super(parentMetricGroup);
+ this.numRecordsOutErrorsCounterSupplier =
numRecordsOutErrorsCounterSupplier;
+ this.numRecordsSendErrorsCounterSupplier =
numRecordsSendErrorsCounterSupplier;
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ }
+
+ @Override
+ public Counter getNumRecordsOutErrorsCounter() {
+ return numRecordsOutErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendErrorsCounter() {
+ return numRecordsSendErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendCounter() {
+ return numRecordsSendCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumBytesSendCounter() {
+ return numBytesSendCounterSupplier.get();
+ }
+
+ @Override
+ public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
+ currentSendTimeGaugeConsumer.accept(gauge);
+ }
+
+ @Override
+ public OperatorIOMetricGroup getIOMetricGroup() {
+ return ioMetricGroupSupplier.get();
+ }
+
+ /** Builder for {@link TestingSinkWriterMetricGroup}. */
+ public static class Builder {
+
+ private MetricGroup parentMetricGroup = null;
+
+ private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () ->
null;
+
+ private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () ->
null;
+
+ private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;
+
+ private Supplier<Counter> numBytesSendCounterSupplier = () -> null;
+
+ private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter
-> {};
+
+ private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () ->
null;
+
+ public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
+ this.parentMetricGroup = parentMetricGroup;
+ return this;
+ }
+
+ public Builder setNumRecordsOutErrorsCounterSupplier(
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
+ this.numRecordsOutErrorsCounterSupplier =
numRecordsOutErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendErrorsCounterSupplier(
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
+ this.numRecordsSendErrorsCounterSupplier =
numRecordsSendErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendCounterSupplier(
+ Supplier<Counter> numRecordsSendCounterSupplier) {
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumBytesSendCounterSupplier(
+ Supplier<Counter> numBytesSendCounterSupplier) {
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setCurrentSendTimeGaugeConsumer(
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ return this;
+ }
+
+ public Builder setIoMetricGroupSupplier(
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ return this;
+ }
+
+ public TestingSinkWriterMetricGroup build() {
+ return new TestingSinkWriterMetricGroup(
+ parentMetricGroup,
+ numRecordsOutErrorsCounterSupplier,
+ numRecordsSendErrorsCounterSupplier,
+ numRecordsSendCounterSupplier,
+ numBytesSendCounterSupplier,
+ currentSendTimeGaugeConsumer,
+ ioMetricGroupSupplier);
+ }
+ }
+}