This is an automated email from the ASF dual-hosted git repository.
mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 20bf67d03 KUDU-3662 [5/n] Add metrics support to replication
20bf67d03 is described below
commit 20bf67d0336c128d0b114e796be4c87b84ab746b
Author: Marton Greber <[email protected]>
AuthorDate: Mon Aug 4 17:51:30 2025 +0200
KUDU-3662 [5/n] Add metrics support to replication
Adds three key metrics to monitor replication job state:
- lastEndTimestamp: closing timestamp for diff scan iterations
- pending: number of splits currently being fetched by readers
- unassigned: number of splits enumerated but not yet assigned
Uses reflection-based wrapper pattern to access private fields
in KuduSourceEnumerator as temporary workaround until upstream
Flink Kudu connector implements metrics support (FLINK-38187).
The MetricWrappedKuduSource wraps the original KuduSource and
delegates all operations while injecting a MetricWrappedKuduEnumerator
that exposes internal state as Flink metrics. This approach allows
clean removal once upstream metrics are available.
Includes unit tests that verify metric existence and timestamp monotonic
progression, though pending and unassigned split counts are difficult
to test reliably in the current test setup.
Change-Id: Ibfbd34c707e7539ee88863399ae3061683f8bb3b
Reviewed-on: http://gerrit.cloudera.org:8080/23246
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Gabriella Lotz <[email protected]>
Reviewed-by: Ashwani Raina <[email protected]>
Tested-by: Marton Greber <[email protected]>
---
java/config/spotbugs/excludeFilter.xml | 10 ++
java/kudu-replication/build.gradle | 2 +-
.../kudu/replication/ReplicationEnvProvider.java | 11 +-
.../wrappedsource/MetricWrappedKuduEnumerator.java | 145 ++++++++++++++++++++
.../wrappedsource/MetricWrappedKuduSource.java | 103 ++++++++++++++
.../wrappedsource/ReflectionSecurityUtils.java | 122 +++++++++++++++++
.../kudu/replication/TestReplicationMetrics.java | 149 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 34 +++++
8 files changed, 574 insertions(+), 2 deletions(-)
diff --git a/java/config/spotbugs/excludeFilter.xml
b/java/config/spotbugs/excludeFilter.xml
index 11da3f997..91dde282c 100644
--- a/java/config/spotbugs/excludeFilter.xml
+++ b/java/config/spotbugs/excludeFilter.xml
@@ -375,4 +375,14 @@
<Class name="org.apache.kudu.test.cluster.FakeDNS"/>
<Bug pattern="DP_DO_INSIDE_DO_PRIVILEGED" />
</Match>
+
+ <!-- kudu-replication exclusions -->
+ <Match>
+ <!-- SerialVersionUID is not needed as the original Flink source
classes
+ also don't define it, and these are wrapper classes. -->
+ <Class name="~org\.apache\.kudu\.replication\.wrappedsource\..*"/>
+ <Or>
+ <Bug pattern="SE_NO_SERIALVERSIONID" />
+ </Or>
+ </Match>
</FindBugsFilter>
diff --git a/java/kudu-replication/build.gradle
b/java/kudu-replication/build.gradle
index 6c6c985d6..8d73c49aa 100644
--- a/java/kudu-replication/build.gradle
+++ b/java/kudu-replication/build.gradle
@@ -28,11 +28,11 @@ dependencies {
testImplementation libs.junit
testImplementation libs.assertj
- testImplementation libs.junit
testImplementation project(path: ":kudu-test-utils", configuration:
"shadow")
testImplementation libs.log4jApi
testImplementation libs.log4jCore
testImplementation libs.log4jSlf4jImpl
+ testImplementation libs.flinkTestUtils
}
shadowJar {
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
index f2691983c..672131303 100644
---
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
@@ -20,15 +20,20 @@ import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kudu.connector.KuduTableInfo;
import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connector.kudu.sink.KuduSink;
import org.apache.flink.connector.kudu.sink.KuduSinkBuilder;
import org.apache.flink.connector.kudu.source.KuduSource;
+import
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
+import org.apache.kudu.replication.wrappedsource.MetricWrappedKuduSource;
+
/**
* Provides a configured {@link StreamExecutionEnvironment} for the
replication job.
* This class does not execute the environment; it only prepares it.
@@ -63,13 +68,17 @@ public class ReplicationEnvProvider {
.setDiscoveryPeriod(Duration.ofSeconds(jobConfig.getDiscoveryIntervalSeconds()))
.build();
+ // TODO(mgreber): remove this line once FLINK-38187 is resolved.
+ Source<Row, KuduSourceSplit, KuduSourceEnumeratorState> wrappedSource =
+ new MetricWrappedKuduSource<>(kuduSource);
+
KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>()
.setWriterConfig(writerConfig)
.setTableInfo(KuduTableInfo.forTable(jobConfig.getSinkTableName()))
.setOperationMapper(new CustomReplicationOperationMapper())
.build();
- env.fromSource(kuduSource, WatermarkStrategy.noWatermarks(), "KuduSource")
+ env.fromSource(wrappedSource, WatermarkStrategy.noWatermarks(),
"KuduSource")
.returns(TypeInformation.of(Row.class))
.sinkTo(kuduSink);
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
new file mode 100644
index 000000000..b92c8af4c
--- /dev/null
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumerator;
+import
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.metrics.Gauge;
+
+import org.apache.kudu.util.HybridTimeUtil;
+
+/**
+ * A wrapper around KuduSourceEnumerator that adds metrics support until
metrics are implemented
+ * in the upstream Flink Kudu connector (FLINK-38187).
+ * Uses the delegate pattern - all SplitEnumerator methods are delegated to
the wrapped
+ * KuduSourceEnumerator, while additionally exposing internal enumerator
fields as metrics
+ * via reflection to provide visibility into split assignment and timestamp
tracking.
+ * TODO(mgreber): remove this file once FLINK-38187 is resolved.
+ */
+public class MetricWrappedKuduEnumerator implements
SplitEnumerator<KuduSourceSplit,
+
KuduSourceEnumeratorState> {
+
+ private final KuduSourceEnumerator delegate;
+ private final SplitEnumeratorContext<KuduSourceSplit> context;
+
+ private final Field lastEndTimestampField;
+ private final Field pendingField;
+ private final Field unassignedField;
+
+ public MetricWrappedKuduEnumerator(
+ KuduTableInfo tableInfo,
+ KuduReaderConfig readerConfig,
+ Boundedness boundedness,
+ Duration discoveryInterval,
+ SplitEnumeratorContext<KuduSourceSplit> context,
+ @Nullable KuduSourceEnumeratorState restoredState
+ ) {
+ this.context = context;
+
+ if (restoredState != null) {
+ this.delegate = new KuduSourceEnumerator(
+ tableInfo, readerConfig, boundedness, discoveryInterval,
context, restoredState);
+ } else {
+ this.delegate = new KuduSourceEnumerator(
+ tableInfo, readerConfig, boundedness, discoveryInterval,
context);
+ }
+
+ // Initialize final fields using privileged access to reflection operations
+ this.lastEndTimestampField =
+ ReflectionSecurityUtils.getAccessibleField(delegate,
"lastEndTimestamp");
+ this.pendingField = ReflectionSecurityUtils.getAccessibleField(delegate,
"pending");
+ this.unassignedField =
ReflectionSecurityUtils.getAccessibleField(delegate, "unassigned");
+
+ }
+
+
+
+ @Override
+ // Safe casts: lambdas return correct types for Gauge<Long> and
Gauge<Integer>
+ @SuppressWarnings("unchecked")
+ public void start() {
+ context.metricGroup().gauge("lastEndTimestamp", (Gauge<Long>)
this::getLastEndTimestamp);
+ context.metricGroup().gauge("pendingCount", (Gauge<Integer>)
this::getPendingCount);
+ context.metricGroup().gauge("unassignedCount", (Gauge<Integer>)
this::getUnassignedCount);
+ delegate.start();
+ }
+
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ delegate.handleSplitRequest(subtaskId, requesterHostname);
+ }
+
+ @Override
+ public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+ delegate.addSplitsBack(splits, subtaskId);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ delegate.addReader(subtaskId);
+ }
+
+ @Override
+ public KuduSourceEnumeratorState snapshotState(long checkpointId) throws
Exception {
+ return delegate.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() {
+ try {
+ delegate.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Error closing KuduSplitEnumerator", e);
+ }
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ delegate.handleSourceEvent(subtaskId, sourceEvent);
+ }
+
+ private long getLastEndTimestamp() {
+ long hybridTime =
ReflectionSecurityUtils.getLongFieldValue(lastEndTimestampField, delegate);
+ long[] parsed = HybridTimeUtil.HTTimestampToPhysicalAndLogical(hybridTime);
+ long epochMicros = parsed[0];
+ return epochMicros / 1_000;
+ }
+
+ private int getPendingCount() {
+ List<KuduSourceSplit> pending =
ReflectionSecurityUtils.getFieldValue(pendingField, delegate);
+ return pending.size();
+ }
+
+ private int getUnassignedCount() {
+ List<KuduSourceSplit> unassigned =
+ ReflectionSecurityUtils.getFieldValue(unassignedField, delegate);
+ return unassigned.size();
+ }
+}
+
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
new file mode 100644
index 000000000..a7ecc7ed5
--- /dev/null
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.time.Duration;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.source.KuduSource;
+import
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A wrapper around KuduSource that adds metrics support until metrics are
implemented
+ * in the upstream Flink Kudu connector (FLINK-38187).
+ * Uses the delegate pattern - all Source methods are delegated to the wrapped
KuduSource,
+ * except for enumerator creation where we inject our own
MetricWrappedKuduEnumerator
+ * to collect and expose metrics.
+ * TODO(mgreber): remove once FLINK-38187 is resolved.
+ */
+public class MetricWrappedKuduSource<OUT> implements Source<OUT,
+ KuduSourceSplit,
+
KuduSourceEnumeratorState> {
+ private final KuduSource<OUT> delegate;
+ private final KuduReaderConfig readerConfig;
+ private final KuduTableInfo tableInfo;
+ private final Boundedness boundedness;
+ private final Duration discoveryInterval;
+
+ public MetricWrappedKuduSource(KuduSource<OUT> delegate) {
+ this.delegate = delegate;
+ this.boundedness = delegate.getBoundedness();
+
+ // We use reflection here for convenience - ideally we could plumb configs
in the
+ // environment provider, but this wrapping approach allows us to simply
supply a KuduSource
+ // to the wrapped source in a clean way. This makes it easier to remove
the wrapper later
+ // when the metrics are implemented in the connector (FLINK-38187),
+ // as it's a drop-in replacement.
+ this.readerConfig = ReflectionSecurityUtils.getPrivateFieldValue(delegate,
"readerConfig");
+ this.tableInfo = ReflectionSecurityUtils.getPrivateFieldValue(delegate,
"tableInfo");
+ this.discoveryInterval =
+ ReflectionSecurityUtils.getPrivateFieldValue(delegate,
"discoveryPeriod");
+ }
+
+
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
+ @Override
+ public SourceReader<OUT, KuduSourceSplit> createReader(
+ SourceReaderContext readerContext) throws Exception {
+ return delegate.createReader(readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<KuduSourceSplit, KuduSourceEnumeratorState>
createEnumerator(
+ SplitEnumeratorContext<KuduSourceSplit> context) {
+ return new MetricWrappedKuduEnumerator(
+ tableInfo, readerConfig, boundedness, discoveryInterval, context,
null);
+ }
+
+ @Override
+ public SplitEnumerator<KuduSourceSplit, KuduSourceEnumeratorState>
restoreEnumerator(
+ SplitEnumeratorContext<KuduSourceSplit> context,
+ KuduSourceEnumeratorState checkpoint) {
+ return new MetricWrappedKuduEnumerator(
+ tableInfo, readerConfig, boundedness, discoveryInterval, context,
checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<KuduSourceSplit> getSplitSerializer() {
+ return delegate.getSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<KuduSourceEnumeratorState>
getEnumeratorCheckpointSerializer() {
+ return delegate.getEnumeratorCheckpointSerializer();
+ }
+}
+
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
new file mode 100644
index 000000000..71ef4e39c
--- /dev/null
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.lang.reflect.Field;
+
+import org.apache.kudu.client.internals.SecurityManagerCompatibility;
+
+/**
+ * Utility class for performing reflection operations with proper
SecurityManager compliance.
+ * All reflection operations that may be restricted by a SecurityManager are
wrapped using
+ * our SecurityManagerCompatibility infrastructure, which handles both legacy
and modern
+ * Java versions where SecurityManager support has been deprecated/removed.
+ */
+public final class ReflectionSecurityUtils {
+
+ private ReflectionSecurityUtils() {
+ }
+
+ /**
+ * Gets the value of a private field in one operation.
+ * Useful when you need to access a field value once or infrequently.
+ *
+ * @param obj the object containing the field
+ * @param fieldName the name of the field to access
+ * @param <T> the expected type of the field value
+ * @return the field value cast to the expected type
+ * @throws RuntimeException if field access fails
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getPrivateFieldValue(Object obj, String fieldName) {
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ try {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(obj);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to access field '%s'", fieldName), e);
+ }
+ });
+ }
+
+ /**
+ * Gets a private field and makes it accessible for repeated use.
+ * Useful when we need to access a field multiple times (e.g., for metrics).
+ *
+ * @param obj the object containing the field
+ * @param fieldName the name of the field to access
+ * @return the accessible Field object
+ * @throws RuntimeException if field access fails
+ */
+ public static Field getAccessibleField(Object obj, String fieldName) {
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ try {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field;
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(
+ String.format("Failed to access private field '%s'", fieldName),
e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Unexpected exception accessing private field '%s'",
fieldName), e);
+ }
+ });
+ }
+
+ /**
+ * Gets the value from an already-accessible field.
+ * Use this with fields obtained from getAccessibleField() for performance
+ * when accessing the same field repeatedly.
+ *
+ * @param field the accessible field
+ * @param obj the object containing the field
+ * @param <T> the expected type of the field value
+ * @return the field value cast to the expected type
+ * @throws RuntimeException if field access fails
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getFieldValue(Field field, Object obj) {
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ try {
+ return (T) field.get(obj);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read field value", e);
+ }
+ });
+ }
+
+ /**
+ * Gets a long value from an already-accessible field.
+ * Optimized version for primitive long fields to avoid boxing/unboxing.
+ *
+ * @param field the accessible field
+ * @param obj the object containing the field
+ * @return the long field value
+ * @throws RuntimeException if field access fails
+ */
+ public static long getLongFieldValue(Field field, Object obj) {
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ try {
+ return field.getLong(obj);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read long field value", e);
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
new file mode 100644
index 000000000..2981c973d
--- /dev/null
+++
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestReplicationMetrics extends ReplicationTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationMetrics.class);
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+
+ @ClassRule
+ public static final MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
+
+ private Long waitForTimestampAdvancement(
+ JobID jobId, Long previousTimestamp, long timeoutMillis) throws
Exception {
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + timeoutMillis;
+
+ while (System.currentTimeMillis() < endTime) {
+ Long currentTimestamp = getCurrentMetricsTimestamp(jobId);
+
+ if (currentTimestamp != null && currentTimestamp > previousTimestamp) {
+ long waitedMs = System.currentTimeMillis() - startTime;
+ LOG.info("Timestamp advanced from {} to {} (waited {}ms)",
+ previousTimestamp, currentTimestamp, waitedMs);
+ return currentTimestamp;
+ }
+ Thread.sleep(500);
+ }
+
+ // Timeout reached
+ Long finalTimestamp = getCurrentMetricsTimestamp(jobId);
+ throw new AssertionError(String.format(
+ "Timeout waiting for timestamp advancement. Previous: %s, Current:
%s, Waited: %dms",
+ previousTimestamp, finalTimestamp, timeoutMillis));
+ }
+
+ private Long getCurrentMetricsTimestamp(JobID jobId) throws Exception {
+ Map<String, Metric> metrics = reporter.getMetricsByIdentifiers(jobId);
+ Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics,
"lastEndTimestamp");
+
+ return lastEndTimestamp.map(Gauge::getValue).orElse(null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Optional<Gauge<T>> findMetric(Map<String, Metric> metrics,
String nameSubstring) {
+ return metrics.entrySet().stream()
+ .filter(entry -> entry.getKey().contains(nameSubstring))
+ .map(Map.Entry::getValue)
+ .filter(metric -> metric instanceof Gauge)
+ .map(metric -> (Gauge<T>) metric)
+ .findFirst();
+ }
+
+ @After
+ public void cleanup() {
+ flinkCluster.cancelAllJobs();
+ }
+
+ @Test(timeout = 100000)
+ public void testMetricsPresence() throws Exception {
+ createAllTypesTable(sourceClient);
+ createAllTypesTable(sinkClient);
+
+ insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+ JobID jobId = envProvider.getEnv().executeAsync().getJobID();
+ waitForTimestampAdvancement(jobId, 0L, 15000);
+
+ Map<String, Metric> metrics = reporter.getMetricsByIdentifiers(jobId);
+
+ // Verify all expected metrics are present
+ Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics,
"lastEndTimestamp");
+ Optional<Gauge<Integer>> pendingCount = findMetric(metrics,
"pendingCount");
+ Optional<Gauge<Integer>> unassignedCount = findMetric(metrics,
"unassignedCount");
+
+ assertTrue("lastEndTimestamp metric should be present",
lastEndTimestamp.isPresent());
+ assertTrue("pendingCount metric should be present",
pendingCount.isPresent());
+ assertTrue("unassignedCount metric should be present",
unassignedCount.isPresent());
+
+ // Basic sanity checks
+ assertNotNull("lastEndTimestamp should have a value",
lastEndTimestamp.get().getValue());
+ assertNotNull("pendingCount should have a value",
pendingCount.get().getValue());
+ assertNotNull("unassignedCount should have a value",
unassignedCount.get().getValue());
+ }
+
+ @Test(timeout = 100000)
+ public void testTimestampProgression() throws Exception {
+ createAllTypesTable(sourceClient);
+ createAllTypesTable(sinkClient);
+
+ // Insert some data to process
+ insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+ JobID jobId = envProvider.getEnv().executeAsync().getJobID();
+
+ // Wait for initial timestamp to be set
+ Long ts1 = waitForTimestampAdvancement(jobId, 0L, 15000);
+ assertNotNull("Initial timestamp should be set", ts1);
+ assertTrue("Initial timestamp should be positive", ts1 > 0);
+
+ // Insert more data and wait for timestamp progression
+ insertRowsIntoAllTypesTable(sourceClient, 10, 10);
+ Long ts2 = waitForTimestampAdvancement(jobId, ts1, 10000);
+
+ // Insert more data and wait for another progression
+ insertRowsIntoAllTypesTable(sourceClient, 20, 10);
+ Long ts3 = waitForTimestampAdvancement(jobId, ts2, 10000);
+
+ // Assert monotonic increase
+ assertTrue("Timestamp should advance: ts2 > ts1", ts2 > ts1);
+ assertTrue("Timestamp should advance: ts3 > ts2", ts3 > ts2);
+ }
+}
diff --git a/java/kudu-replication/src/test/resources/log4j2-test.properties
b/java/kudu-replication/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..89d293589
--- /dev/null
+++ b/java/kudu-replication/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Override Flink's log4j2-test.properties that sets rootLogger.level = OFF
+
+status = error
+name = PropertiesConfig
+appenders = console
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+rootLogger.level = info
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
+
+logger.kudu.name = org.apache.kudu
+logger.kudu.level = debug
\ No newline at end of file