AHeise commented on a change in pull request #16875:
URL: https://github.com/apache/flink/pull/16875#discussion_r692752256
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -82,49 +94,107 @@
.withLogConsumer(LOG_CONSUMER)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
- @Before
- public void setUp() {
- metricListener = new MetricListener();
+ @BeforeAll
+ public static void beforeAll() {
+ KAFKA_CONTAINER.start();
}
- @Parameterized.Parameters
- public static List<DeliveryGuarantee> guarantees() {
- return ImmutableList.of(
- DeliveryGuarantee.NONE,
- DeliveryGuarantee.AT_LEAST_ONCE,
- DeliveryGuarantee.EXACTLY_ONCE);
+ @AfterAll
+ public static void afterAll() {
+ KAFKA_CONTAINER.stop();
}
- public KafkaWriterITCase(DeliveryGuarantee guarantee) {
- this.guarantee = guarantee;
+ @BeforeEach
+ public void setUp() {
+ metricListener = new MetricListener();
}
- @Test
- public void testRegisterMetrics() throws Exception {
+ @ParameterizedTest
+ @EnumSource(DeliveryGuarantee.class)
Review comment:
👍
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -200,4 +270,96 @@ public UserCodeClassLoader getUserCodeClassLoader() {
throw new UnsupportedOperationException("Not implemented.");
}
}
+
+ private static class DummySinkWriterContext implements SinkWriter.Context {
+ @Override
+ public long currentWatermark() {
+ return 0;
+ }
+
+ @Override
+ public Long timestamp() {
+ return null;
+ }
+ }
+
+ private static class NumBytesOutOperatorIOMetricGroup implements
OperatorIOMetricGroup {
Review comment:
Could you please check if you can use
`UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup()`
instead? For another idea, see below.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -32,40 +40,44 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.Duration;
-import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.IntStream;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThrows;
/** Tests for the standalone KafkaWriter. */
-@RunWith(Parameterized.class)
public class KafkaWriterITCase {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaWriterITCase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new
Slf4jLogConsumer(LOG);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final String KAFKA_METRIC_WITH_GROUP_NAME =
"KafkaProducer.incoming-byte-total";
-
- private final DeliveryGuarantee guarantee;
+ private static final SinkWriter.Context SINK_WRITER_CONTEXT = new
DummySinkWriterContext();
private MetricListener metricListener;
- @ClassRule
- public static final KafkaContainer KAFKA_CONTAINER =
+ private static final KafkaContainer KAFKA_CONTAINER =
Review comment:
Add `@Nested` test? Then you don't need to spawn container when running
a test in IDE that doesn't need it.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -82,49 +94,107 @@
.withLogConsumer(LOG_CONSUMER)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
- @Before
- public void setUp() {
- metricListener = new MetricListener();
+ @BeforeAll
+ public static void beforeAll() {
+ KAFKA_CONTAINER.start();
}
- @Parameterized.Parameters
- public static List<DeliveryGuarantee> guarantees() {
- return ImmutableList.of(
- DeliveryGuarantee.NONE,
- DeliveryGuarantee.AT_LEAST_ONCE,
- DeliveryGuarantee.EXACTLY_ONCE);
+ @AfterAll
+ public static void afterAll() {
+ KAFKA_CONTAINER.stop();
}
- public KafkaWriterITCase(DeliveryGuarantee guarantee) {
- this.guarantee = guarantee;
+ @BeforeEach
+ public void setUp() {
+ metricListener = new MetricListener();
}
- @Test
- public void testRegisterMetrics() throws Exception {
+ @ParameterizedTest
+ @EnumSource(DeliveryGuarantee.class)
+ public void testRegisterMetrics(DeliveryGuarantee guarantee) throws
Exception {
try (final KafkaWriter<Integer> ignored =
- createWriterWithConfiguration(getKafkaClientConfiguration())) {
+ createWriterWithConfiguration(getKafkaClientConfiguration(),
guarantee)) {
metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME);
}
}
- @Test
- public void testNotRegisterMetrics() throws Exception {
+ @ParameterizedTest
+ @EnumSource(DeliveryGuarantee.class)
+ public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws
Exception {
final Properties config = getKafkaClientConfiguration();
config.put("flink.disable-metrics", "true");
- try (final KafkaWriter<Integer> ignored =
createWriterWithConfiguration(config)) {
+ try (final KafkaWriter<Integer> ignored =
+ createWriterWithConfiguration(config, guarantee)) {
assertThrows(
IllegalArgumentException.class,
() ->
metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME));
}
}
- private KafkaWriter<Integer> createWriterWithConfiguration(Properties
config) {
+ @Test
+ public void testIncreasingByteOutCounter() throws Exception {
+ final OperatorIOMetricGroup operatorIOMetricGroup = new
NumBytesOutOperatorIOMetricGroup();
+ final InternalSinkWriterMetricGroup metricGroup =
+ InternalSinkWriterMetricGroup.mock(
+ metricListener.getMetricGroup(),
operatorIOMetricGroup);
+ try (final KafkaWriter<Integer> writer =
+ createWriterWithConfiguration(
+ getKafkaClientConfiguration(), DeliveryGuarantee.NONE,
metricGroup)) {
+ final Counter numBytesOut =
operatorIOMetricGroup.getNumBytesOutCounter();
+ Assertions.assertEquals(numBytesOut.getCount(), 0L);
+ writer.write(1, SINK_WRITER_CONTEXT);
+ metricListener.getGauge("currentSendTime");
+ MatcherAssert.assertThat(numBytesOut.getCount(), greaterThan(0L));
+ }
+ }
+
+ @Test
+ public void testCurrentSendTimeMetric() throws Exception {
+ final OperatorIOMetricGroup operatorIOMetricGroup = new
NumBytesOutOperatorIOMetricGroup();
+ final InternalSinkWriterMetricGroup metricGroup =
+ InternalSinkWriterMetricGroup.mock(
+ metricListener.getMetricGroup(),
operatorIOMetricGroup);
+ try (final KafkaWriter<Integer> writer =
+ createWriterWithConfiguration(
+ getKafkaClientConfiguration(),
+ DeliveryGuarantee.AT_LEAST_ONCE,
+ metricGroup)) {
+ final Gauge<Long> currentSendTime =
metricListener.getGauge("currentSendTime");
+ Assertions.assertEquals(currentSendTime.getValue(), 0L);
+ IntStream.range(0, 100)
+ .forEach(
+ (run) -> {
+ try {
+ writer.write(1, SINK_WRITER_CONTEXT);
+ // Manually flush the records to generate
a sendTime
+ if (run % 10 == 0) {
+ writer.prepareCommit(false);
Review comment:
How about calling a `VisibleForTesting` only flush method here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
##########
@@ -55,6 +55,12 @@ public static InternalSinkWriterMetricGroup mock(MetricGroup
metricGroup) {
metricGroup,
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
Review comment:
If we use
`UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup()`
here, maybe we can get rid of your new `mock`? Then we could just use
`metricGroup#getIOMetricGroup` for assertions.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -139,6 +147,7 @@ public void write(IN element, Context context) throws
IOException {
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
pendingRecords.incrementAndGet();
currentProducer.send(record, deliveryCallback);
+ MetricUtil.sync(byteOutMetric, numBytesOutCounter);
Review comment:
I just saw that `metricValue` is under lock, so this is probably too
expensive for every record. How about adding a timerservice callback every
second that syncs?
Or we could use `Partitioner#newBatch` to sync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]