This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 20bf67d03 KUDU-3662 [5/n] Add metrics support to replication
20bf67d03 is described below

commit 20bf67d0336c128d0b114e796be4c87b84ab746b
Author: Marton Greber <[email protected]>
AuthorDate: Mon Aug 4 17:51:30 2025 +0200

    KUDU-3662 [5/n] Add metrics support to replication
    
    Adds three key metrics to monitor replication job state:
    - lastEndTimestamp: closing timestamp for diff scan iterations
    - pending: number of splits currently being fetched by readers
    - unassigned: number of splits enumerated but not yet assigned
    
    Uses reflection-based wrapper pattern to access private fields
    in KuduSourceEnumerator as temporary workaround until upstream
    Flink Kudu connector implements metrics support (FLINK-38187).
    
    The MetricWrappedKuduSource wraps the original KuduSource and
    delegates all operations while injecting a MetricWrappedKuduEnumerator
    that exposes internal state as Flink metrics. This approach allows
    clean removal once upstream metrics are available.
    
    Includes unit tests that verify metric existence and timestamp monotonic
    progression, though pending and unassigned split counts are difficult
    to test reliably in the current test setup.
    
    Change-Id: Ibfbd34c707e7539ee88863399ae3061683f8bb3b
    Reviewed-on: http://gerrit.cloudera.org:8080/23246
    Reviewed-by: Alexey Serbin <[email protected]>
    Reviewed-by: Gabriella Lotz <[email protected]>
    Reviewed-by: Ashwani Raina <[email protected]>
    Tested-by: Marton Greber <[email protected]>
---
 java/config/spotbugs/excludeFilter.xml             |  10 ++
 java/kudu-replication/build.gradle                 |   2 +-
 .../kudu/replication/ReplicationEnvProvider.java   |  11 +-
 .../wrappedsource/MetricWrappedKuduEnumerator.java | 145 ++++++++++++++++++++
 .../wrappedsource/MetricWrappedKuduSource.java     | 103 ++++++++++++++
 .../wrappedsource/ReflectionSecurityUtils.java     | 122 +++++++++++++++++
 .../kudu/replication/TestReplicationMetrics.java   | 149 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  34 +++++
 8 files changed, 574 insertions(+), 2 deletions(-)

diff --git a/java/config/spotbugs/excludeFilter.xml 
b/java/config/spotbugs/excludeFilter.xml
index 11da3f997..91dde282c 100644
--- a/java/config/spotbugs/excludeFilter.xml
+++ b/java/config/spotbugs/excludeFilter.xml
@@ -375,4 +375,14 @@
         <Class name="org.apache.kudu.test.cluster.FakeDNS"/>
         <Bug pattern="DP_DO_INSIDE_DO_PRIVILEGED" />
     </Match>
+
+    <!-- kudu-replication exclusions -->
+    <Match>
+        <!-- SerialVersionUID is not needed as the original Flink source 
classes
+             also don't define it, and these are wrapper classes. -->
+        <Class name="~org\.apache\.kudu\.replication\.wrappedsource\..*"/>
+        <Or>
+            <Bug pattern="SE_NO_SERIALVERSIONID" />
+        </Or>
+    </Match>
 </FindBugsFilter>
diff --git a/java/kudu-replication/build.gradle 
b/java/kudu-replication/build.gradle
index 6c6c985d6..8d73c49aa 100644
--- a/java/kudu-replication/build.gradle
+++ b/java/kudu-replication/build.gradle
@@ -28,11 +28,11 @@ dependencies {
 
     testImplementation libs.junit
     testImplementation libs.assertj
-    testImplementation libs.junit
     testImplementation project(path: ":kudu-test-utils", configuration: 
"shadow")
     testImplementation libs.log4jApi
     testImplementation libs.log4jCore
     testImplementation libs.log4jSlf4jImpl
+    testImplementation libs.flinkTestUtils
 }
 
 shadowJar {
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
index f2691983c..672131303 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
@@ -20,15 +20,20 @@ import java.time.Duration;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.kudu.connector.KuduTableInfo;
 import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connector.kudu.sink.KuduSink;
 import org.apache.flink.connector.kudu.sink.KuduSinkBuilder;
 import org.apache.flink.connector.kudu.source.KuduSource;
+import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 
+import org.apache.kudu.replication.wrappedsource.MetricWrappedKuduSource;
+
 /**
  * Provides a configured {@link StreamExecutionEnvironment} for the 
replication job.
  * This class does not execute the environment; it only prepares it.
@@ -63,13 +68,17 @@ public class ReplicationEnvProvider {
             
.setDiscoveryPeriod(Duration.ofSeconds(jobConfig.getDiscoveryIntervalSeconds()))
             .build();
 
+    // TODO(mgreber): remove this line once FLINK-38187 is resolved.
+    Source<Row, KuduSourceSplit, KuduSourceEnumeratorState> wrappedSource =
+            new MetricWrappedKuduSource<>(kuduSource);
+
     KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>()
             .setWriterConfig(writerConfig)
             .setTableInfo(KuduTableInfo.forTable(jobConfig.getSinkTableName()))
             .setOperationMapper(new CustomReplicationOperationMapper())
             .build();
 
-    env.fromSource(kuduSource, WatermarkStrategy.noWatermarks(), "KuduSource")
+    env.fromSource(wrappedSource, WatermarkStrategy.noWatermarks(), 
"KuduSource")
             .returns(TypeInformation.of(Row.class))
             .sinkTo(kuduSink);
 
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
new file mode 100644
index 000000000..b92c8af4c
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumerator;
+import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.metrics.Gauge;
+
+import org.apache.kudu.util.HybridTimeUtil;
+
+/**
+ * A wrapper around KuduSourceEnumerator that adds metrics support until 
metrics are implemented
+ * in the upstream Flink Kudu connector (FLINK-38187).
+ * Uses the delegate pattern - all SplitEnumerator methods are delegated to 
the wrapped
+ * KuduSourceEnumerator, while additionally exposing internal enumerator 
fields as metrics
+ * via reflection to provide visibility into split assignment and timestamp 
tracking.
+ * TODO(mgreber): remove this file once FLINK-38187 is resolved.
+ */
+public class MetricWrappedKuduEnumerator implements 
SplitEnumerator<KuduSourceSplit,
+                                                                    
KuduSourceEnumeratorState> {
+
+  private final KuduSourceEnumerator delegate;
+  private final SplitEnumeratorContext<KuduSourceSplit> context;
+
+  private final Field lastEndTimestampField;
+  private final Field pendingField;
+  private final Field unassignedField;
+
+  public MetricWrappedKuduEnumerator(
+          KuduTableInfo tableInfo,
+          KuduReaderConfig readerConfig,
+          Boundedness boundedness,
+          Duration discoveryInterval,
+          SplitEnumeratorContext<KuduSourceSplit> context,
+          @Nullable KuduSourceEnumeratorState restoredState
+  ) {
+    this.context = context;
+
+    if (restoredState != null) {
+      this.delegate = new KuduSourceEnumerator(
+              tableInfo, readerConfig, boundedness, discoveryInterval, 
context, restoredState);
+    } else {
+      this.delegate = new KuduSourceEnumerator(
+              tableInfo, readerConfig, boundedness, discoveryInterval, 
context);
+    }
+
+    // Initialize final fields using privileged access to reflection operations
+    this.lastEndTimestampField =
+        ReflectionSecurityUtils.getAccessibleField(delegate, 
"lastEndTimestamp");
+    this.pendingField = ReflectionSecurityUtils.getAccessibleField(delegate, 
"pending");
+    this.unassignedField = 
ReflectionSecurityUtils.getAccessibleField(delegate, "unassigned");
+
+  }
+
+
+
+  @Override
+  // Safe casts: lambdas return correct types for Gauge<Long> and 
Gauge<Integer>
+  @SuppressWarnings("unchecked")
+  public void start() {
+    context.metricGroup().gauge("lastEndTimestamp", (Gauge<Long>) 
this::getLastEndTimestamp);
+    context.metricGroup().gauge("pendingCount", (Gauge<Integer>) 
this::getPendingCount);
+    context.metricGroup().gauge("unassignedCount", (Gauge<Integer>) 
this::getUnassignedCount);
+    delegate.start();
+  }
+
+
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+    delegate.handleSplitRequest(subtaskId, requesterHostname);
+  }
+
+  @Override
+  public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+    delegate.addSplitsBack(splits, subtaskId);
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    delegate.addReader(subtaskId);
+  }
+
+  @Override
+  public KuduSourceEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+    return delegate.snapshotState(checkpointId);
+  }
+
+  @Override
+  public void close() {
+    try {
+      delegate.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Error closing KuduSplitEnumerator", e);
+    }
+  }
+
+  @Override
+  public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+    delegate.handleSourceEvent(subtaskId, sourceEvent);
+  }
+
+  private long getLastEndTimestamp() {
+    long hybridTime = 
ReflectionSecurityUtils.getLongFieldValue(lastEndTimestampField, delegate);
+    long[] parsed = HybridTimeUtil.HTTimestampToPhysicalAndLogical(hybridTime);
+    long epochMicros = parsed[0];
+    return epochMicros / 1_000;
+  }
+
+  private int getPendingCount() {
+    List<KuduSourceSplit> pending = 
ReflectionSecurityUtils.getFieldValue(pendingField, delegate);
+    return pending.size();
+  }
+
+  private int getUnassignedCount() {
+    List<KuduSourceSplit> unassigned =
+        ReflectionSecurityUtils.getFieldValue(unassignedField, delegate);
+    return unassigned.size();
+  }
+}
+
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
new file mode 100644
index 000000000..a7ecc7ed5
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.time.Duration;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.source.KuduSource;
+import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A wrapper around KuduSource that adds metrics support until metrics are 
implemented
+ * in the upstream Flink Kudu connector (FLINK-38187).
+ * Uses the delegate pattern - all Source methods are delegated to the wrapped 
KuduSource,
+ * except for enumerator creation where we inject our own 
MetricWrappedKuduEnumerator
+ * to collect and expose metrics.
+ * TODO(mgreber): remove once FLINK-38187 is resolved.
+ */
+public class MetricWrappedKuduSource<OUT> implements Source<OUT,
+                                                            KuduSourceSplit,
+                                                            
KuduSourceEnumeratorState> {
+  private final KuduSource<OUT> delegate;
+  private final KuduReaderConfig readerConfig;
+  private final KuduTableInfo tableInfo;
+  private final Boundedness boundedness;
+  private final Duration discoveryInterval;
+
+  public MetricWrappedKuduSource(KuduSource<OUT> delegate) {
+    this.delegate = delegate;
+    this.boundedness = delegate.getBoundedness();
+
+    // We use reflection here for convenience - ideally we could plumb configs 
in the
+    // environment provider, but this wrapping approach allows us to simply 
supply a KuduSource
+    // to the wrapped source in a clean way. This makes it easier to remove 
the wrapper later
+    // when the metrics are implemented in the connector (FLINK-38187),
+    // as it's a drop-in replacement.
+    this.readerConfig = ReflectionSecurityUtils.getPrivateFieldValue(delegate, 
"readerConfig");
+    this.tableInfo = ReflectionSecurityUtils.getPrivateFieldValue(delegate, 
"tableInfo");
+    this.discoveryInterval =
+        ReflectionSecurityUtils.getPrivateFieldValue(delegate, 
"discoveryPeriod");
+  }
+
+
+
+  @Override
+  public Boundedness getBoundedness() {
+    return boundedness;
+  }
+
+  @Override
+  public SourceReader<OUT, KuduSourceSplit> createReader(
+          SourceReaderContext readerContext) throws Exception {
+    return delegate.createReader(readerContext);
+  }
+
+  @Override
+  public SplitEnumerator<KuduSourceSplit, KuduSourceEnumeratorState> 
createEnumerator(
+          SplitEnumeratorContext<KuduSourceSplit> context) {
+    return new MetricWrappedKuduEnumerator(
+            tableInfo, readerConfig, boundedness, discoveryInterval, context, 
null);
+  }
+
+  @Override
+  public SplitEnumerator<KuduSourceSplit, KuduSourceEnumeratorState> 
restoreEnumerator(
+          SplitEnumeratorContext<KuduSourceSplit> context,
+          KuduSourceEnumeratorState checkpoint) {
+    return new MetricWrappedKuduEnumerator(
+            tableInfo, readerConfig, boundedness, discoveryInterval, context, 
checkpoint);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<KuduSourceSplit> getSplitSerializer() {
+    return delegate.getSplitSerializer();
+  }
+
+  @Override
+  public SimpleVersionedSerializer<KuduSourceEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+    return delegate.getEnumeratorCheckpointSerializer();
+  }
+}
+
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
new file mode 100644
index 000000000..71ef4e39c
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/ReflectionSecurityUtils.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.lang.reflect.Field;
+
+import org.apache.kudu.client.internals.SecurityManagerCompatibility;
+
+/**
+ * Utility class for performing reflection operations with proper 
SecurityManager compliance.
+ * All reflection operations that may be restricted by a SecurityManager are 
wrapped using
+ * our SecurityManagerCompatibility infrastructure, which handles both legacy 
and modern
+ * Java versions where SecurityManager support has been deprecated/removed.
+ */
+public final class ReflectionSecurityUtils {
+
+  private ReflectionSecurityUtils() {
+  }
+
+  /**
+   * Gets the value of a private field in one operation.
+   * Useful when you need to access a field value once or infrequently.
+   *
+   * @param obj the object containing the field
+   * @param fieldName the name of the field to access
+   * @param <T> the expected type of the field value
+   * @return the field value cast to the expected type
+   * @throws RuntimeException if field access fails
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T getPrivateFieldValue(Object obj, String fieldName) {
+    return SecurityManagerCompatibility.get().doPrivileged(() -> {
+      try {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(obj);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Failed to access field '%s'", fieldName), e);
+      }
+    });
+  }
+
+  /**
+   * Gets a private field and makes it accessible for repeated use.
+   * Useful when we need to access a field multiple times (e.g., for metrics).
+   *
+   * @param obj the object containing the field
+   * @param fieldName the name of the field to access
+   * @return the accessible Field object
+   * @throws RuntimeException if field access fails
+   */
+  public static Field getAccessibleField(Object obj, String fieldName) {
+    return SecurityManagerCompatibility.get().doPrivileged(() -> {
+      try {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field;
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException(
+            String.format("Failed to access private field '%s'", fieldName), 
e);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Unexpected exception accessing private field '%s'", 
fieldName), e);
+      }
+    });
+  }
+
+  /**
+   * Gets the value from an already-accessible field.
+   * Use this with fields obtained from getAccessibleField() for performance
+   * when accessing the same field repeatedly.
+   *
+   * @param field the accessible field
+   * @param obj the object containing the field
+   * @param <T> the expected type of the field value
+   * @return the field value cast to the expected type
+   * @throws RuntimeException if field access fails
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T getFieldValue(Field field, Object obj) {
+    return SecurityManagerCompatibility.get().doPrivileged(() -> {
+      try {
+        return (T) field.get(obj);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read field value", e);
+      }
+    });
+  }
+
+  /**
+   * Gets a long value from an already-accessible field.
+   * Optimized version for primitive long fields to avoid boxing/unboxing.
+   *
+   * @param field the accessible field
+   * @param obj the object containing the field
+   * @return the long field value
+   * @throws RuntimeException if field access fails
+   */
+  public static long getLongFieldValue(Field field, Object obj) {
+    return SecurityManagerCompatibility.get().doPrivileged(() -> {
+      try {
+        return field.getLong(obj);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read long field value", e);
+      }
+    });
+  }
+}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
new file mode 100644
index 000000000..2981c973d
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestReplicationMetrics extends ReplicationTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationMetrics.class);
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+
+  @ClassRule
+  public static final MiniClusterWithClientResource flinkCluster =
+          new MiniClusterWithClientResource(
+                  new MiniClusterResourceConfiguration.Builder()
+                          .setNumberSlotsPerTaskManager(1)
+                          .setNumberTaskManagers(1)
+                          .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                          .build());
+
+  private Long waitForTimestampAdvancement(
+          JobID jobId, Long previousTimestamp, long timeoutMillis) throws 
Exception {
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + timeoutMillis;
+
+    while (System.currentTimeMillis() < endTime) {
+      Long currentTimestamp = getCurrentMetricsTimestamp(jobId);
+
+      if (currentTimestamp != null && currentTimestamp > previousTimestamp) {
+        long waitedMs = System.currentTimeMillis() - startTime;
+        LOG.info("Timestamp advanced from {} to {} (waited {}ms)",
+                 previousTimestamp, currentTimestamp, waitedMs);
+        return currentTimestamp;
+      }
+      Thread.sleep(500);
+    }
+
+    // Timeout reached
+    Long finalTimestamp = getCurrentMetricsTimestamp(jobId);
+    throw new AssertionError(String.format(
+            "Timeout waiting for timestamp advancement. Previous: %s, Current: 
%s, Waited: %dms",
+            previousTimestamp, finalTimestamp, timeoutMillis));
+  }
+
+  private Long getCurrentMetricsTimestamp(JobID jobId) throws Exception {
+    Map<String, Metric> metrics = reporter.getMetricsByIdentifiers(jobId);
+    Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics, 
"lastEndTimestamp");
+
+    return lastEndTimestamp.map(Gauge::getValue).orElse(null);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Optional<Gauge<T>> findMetric(Map<String, Metric> metrics, 
String nameSubstring) {
+    return metrics.entrySet().stream()
+            .filter(entry -> entry.getKey().contains(nameSubstring))
+            .map(Map.Entry::getValue)
+            .filter(metric -> metric instanceof Gauge)
+            .map(metric -> (Gauge<T>) metric)
+            .findFirst();
+  }
+
+  @After
+  public void cleanup() {
+    flinkCluster.cancelAllJobs();
+  }
+
+  @Test(timeout = 100000)
+  public void testMetricsPresence() throws Exception {
+    createAllTypesTable(sourceClient);
+    createAllTypesTable(sinkClient);
+
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+    JobID jobId = envProvider.getEnv().executeAsync().getJobID();
+    waitForTimestampAdvancement(jobId, 0L, 15000);
+
+    Map<String, Metric> metrics = reporter.getMetricsByIdentifiers(jobId);
+
+    // Verify all expected metrics are present
+    Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics, 
"lastEndTimestamp");
+    Optional<Gauge<Integer>> pendingCount = findMetric(metrics, 
"pendingCount");
+    Optional<Gauge<Integer>> unassignedCount = findMetric(metrics, 
"unassignedCount");
+
+    assertTrue("lastEndTimestamp metric should be present", 
lastEndTimestamp.isPresent());
+    assertTrue("pendingCount metric should be present", 
pendingCount.isPresent());
+    assertTrue("unassignedCount metric should be present", 
unassignedCount.isPresent());
+
+    // Basic sanity checks
+    assertNotNull("lastEndTimestamp should have a value", 
lastEndTimestamp.get().getValue());
+    assertNotNull("pendingCount should have a value", 
pendingCount.get().getValue());
+    assertNotNull("unassignedCount should have a value", 
unassignedCount.get().getValue());
+  }
+
+  @Test(timeout = 100000)
+  public void testTimestampProgression() throws Exception {
+    createAllTypesTable(sourceClient);
+    createAllTypesTable(sinkClient);
+
+    // Insert some data to process
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+    JobID jobId = envProvider.getEnv().executeAsync().getJobID();
+
+    // Wait for initial timestamp to be set
+    Long ts1 = waitForTimestampAdvancement(jobId, 0L, 15000);
+    assertNotNull("Initial timestamp should be set", ts1);
+    assertTrue("Initial timestamp should be positive", ts1 > 0);
+
+    // Insert more data and wait for timestamp progression
+    insertRowsIntoAllTypesTable(sourceClient, 10, 10);
+    Long ts2 = waitForTimestampAdvancement(jobId, ts1, 10000);
+
+    // Insert more data and wait for another progression
+    insertRowsIntoAllTypesTable(sourceClient, 20, 10);
+    Long ts3 = waitForTimestampAdvancement(jobId, ts2, 10000);
+
+    // Assert monotonic increase
+    assertTrue("Timestamp should advance: ts2 > ts1", ts2 > ts1);
+    assertTrue("Timestamp should advance: ts3 > ts2", ts3 > ts2);
+  }
+}
diff --git a/java/kudu-replication/src/test/resources/log4j2-test.properties 
b/java/kudu-replication/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..89d293589
--- /dev/null
+++ b/java/kudu-replication/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Override Flink's log4j2-test.properties that sets rootLogger.level = OFF
+
+status = error
+name = PropertiesConfig
+appenders = console
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+rootLogger.level = info
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
+
+logger.kudu.name = org.apache.kudu
+logger.kudu.level = debug
\ No newline at end of file

Reply via email to