This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c2af095 CASSSIDECAR-216: Capture Metrics for Schema Reporting (#204)
2c2af095 is described below

commit 2c2af0954472b2dc83e52954bd4fa745659b804c
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Tue Mar 18 11:52:58 2025 -0500

    CASSSIDECAR-216: Capture Metrics for Schema Reporting (#204)
    
    Patch by Yuriy Semchyshyn; Reviewed by Francisco Guerrero, Saranya 
Krishnakumar, Yifan Cai for CASSSIDECAR-216
---
 CHANGES.txt                                        |   1 +
 .../sidecar/datahub/IdentifiersProvider.java       |  34 +++++-
 .../cassandra/sidecar/datahub/SchemaReporter.java  |  64 ++++++++--
 .../sidecar/datahub/SchemaReportingTask.java       |   2 +-
 .../sidecar/handlers/ReportSchemaHandler.java      |   2 +-
 .../cassandra/sidecar/metrics/DeltaGauge.java      |  12 +-
 .../cassandra/sidecar/metrics/ServerMetrics.java   |   6 +
 .../sidecar/metrics/ServerMetricsImpl.java         |   9 ++
 .../metrics/server/SchemaReportingMetrics.java     |  73 +++++++++++
 .../datahub/SchemaReporterIntegrationTest.java     |  47 ++++++-
 .../sidecar/datahub/IdentifiersProviderTest.java   | 136 +++++++++++++++++++++
 .../sidecar/datahub/SchemaReporterTest.java        | 112 +++++++++++++----
 .../cassandra/sidecar/metrics/DeltaGaugeTest.java  |  17 +++
 13 files changed, 462 insertions(+), 53 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 90497fa8..baef5ee7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Capture Metrics for Schema Reporting (CASSSIDECAR-216)
  * SidecarInstanceCodec is failing to find codec for type (CASSSIDECAR-229)
  * Retry Failed Schema Reports (CASSSIDECAR-217)
  * Guice modularization (CASSSIDECAR-208)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
index e90f4ca4..189d5189 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.TableMetadata;
@@ -32,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * An abstract class that has to be extended and instantiated for every 
Cassandra
- * cluster that needs its schema converted into a DataHub-compliant format.
+ * cluster that needs its schema converted into a DataHub-compliant format
  */
 public abstract class IdentifiersProvider
 {
@@ -42,7 +43,26 @@ public abstract class IdentifiersProvider
     protected static final String DATA_PLATFORM_INSTANCE = 
"dataPlatformInstance";
     protected static final String CONTAINER = "container";
     protected static final String DATASET = "dataset";
-    protected static final String PROD = "PROD";  // DataHub requires this to 
be {@code PROD} regardless
+    protected static final String PROD = "PROD";  // DataHub requires this to 
be {@code PROD}
+
+    protected static final ToStringStyle STYLE = new ToStringStyle()
+    {{
+        setUseShortClassName(false);
+        setUseClassName(true);
+        setUseIdentityHashCode(false);
+        setUseFieldNames(false);
+        setContentStart("(");
+        setFieldSeparatorAtStart(false);
+        setFieldSeparator(",");
+        setFieldSeparatorAtEnd(false);
+        setContentEnd(")");
+        setDefaultFullDetail(true);
+        setArrayContentDetail(true);
+        setArrayStart("(");
+        setArraySeparator(",");
+        setArrayEnd(")");
+        setNullText("null");
+    }};
 
     /**
      * A public getter method that returns the name of Cassandra Organization
@@ -231,13 +251,19 @@ public abstract class IdentifiersProvider
     @NotNull
     public String toString()
     {
-        return new ToStringBuilder(this)
+        return new ToStringBuilder(this, STYLE)
                 .append(this.organization())
                 .append(this.platform())
                 .append(this.environment())
                 .append(this.application())
                 .append(this.cluster())
                 .append(this.identifier())
-                .toString();
+                .toString()
+                .replaceAll("\\s", "");
+
+        // Use of a custom {@link ToStringStyle} implementation prevents the 
hash code from being
+        // included into the {@link String} representation; which, in 
conjunction with the removal
+        // of all whitespace characters, simplifies extraction of {@link 
IdentifierProvider}
+        // objects from the execution logs; without negatively affecting the 
readability much
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
index 4ad3e3f3..8e8d7fcc 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
@@ -23,7 +23,10 @@ import java.util.List;
 import java.util.stream.Stream;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Timer;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.Metadata;
@@ -34,6 +37,9 @@ import com.linkedin.data.template.RecordTemplate;
 import datahub.client.Emitter;
 import datahub.event.MetadataChangeProposalWrapper;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+import org.apache.cassandra.sidecar.metrics.DeltaGauge;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -47,6 +53,8 @@ import org.jetbrains.annotations.NotNull;
 @Singleton
 public class SchemaReporter
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaReporter.class);
+
     @NotNull
     protected final IdentifiersProvider identifiersProvider;
     @NotNull
@@ -57,6 +65,8 @@ public class SchemaReporter
     protected final List<TableToAspectConverter<? extends RecordTemplate>> 
tableConverters;
     @NotNull
     protected final EmitterFactory emitterFactory;
+    @NotNull
+    protected final SchemaReportingMetrics reportingMetrics;
 
     /**
      * The public constructor that instantiates {@link SchemaReporter} with 
default configuration.
@@ -66,10 +76,12 @@ public class SchemaReporter
      *
      * @param identifiersProvider an instance of {@link IdentifiersProvider} 
to use
      * @param emitterFactory an instance of {@link EmitterFactory} to use
+     * @param sidecarMetrics an instance of {@link SidecarMetrics} to obtain 
{@link SchemaReportingMetrics} from
      */
     @Inject
     public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
-                          @NotNull EmitterFactory emitterFactory)
+                          @NotNull EmitterFactory emitterFactory,
+                          @NotNull SidecarMetrics sidecarMetrics)
     {
         this(identifiersProvider,
              ImmutableList.of(new 
ClusterToDataPlatformInfoConverter(identifiersProvider),
@@ -85,7 +97,8 @@ public class SchemaReporter
                               new 
TableToDataPlatformInstanceConverter(identifiersProvider),
                               new 
TableToBrowsePathsV2Converter(identifiersProvider),
                               new 
TableToBrowsePathsConverter(identifiersProvider)),
-             emitterFactory);
+             emitterFactory,
+             sidecarMetrics.server().schemaReporting());
     }
 
     /**
@@ -96,44 +109,73 @@ public class SchemaReporter
      * @param keyspaceConverters a {@link List} of {@link 
KeyspaceToAspectConverter} instances to use
      * @param tableConverters a {@link List} of {@link TableToAspectConverter} 
instances to use
      * @param emitterFactory an instance of {@link EmitterFactory} to use
+     * @param reportingMetrics an instance of {@link SchemaReportingMetrics} 
to use
      */
     protected SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
                              @NotNull List<ClusterToAspectConverter<? extends 
RecordTemplate>> clusterConverters,
                              @NotNull List<KeyspaceToAspectConverter<? extends 
RecordTemplate>> keyspaceConverters,
                              @NotNull List<TableToAspectConverter<? extends 
RecordTemplate>> tableConverters,
-                             @NotNull EmitterFactory emitterFactory)
+                             @NotNull EmitterFactory emitterFactory,
+                             @NotNull SchemaReportingMetrics reportingMetrics)
     {
         this.identifiersProvider = identifiersProvider;
         this.clusterConverters = clusterConverters;
         this.keyspaceConverters = keyspaceConverters;
         this.tableConverters = tableConverters;
         this.emitterFactory = emitterFactory;
+        this.reportingMetrics = reportingMetrics;
     }
 
     /**
-     * Public method for converting and reporting the Cassandra schema
+     * Public method for converting and reporting the Cassandra schema when 
triggered by a scheduled periodic task
      *
      * @param cluster the {@link Cluster} to extract Cassandra schema from
      */
-    public void process(@NotNull Cluster cluster)
+    public void processScheduled(@NotNull Cluster cluster)
+    {
+        process(cluster.getMetadata(), 
reportingMetrics.startedSchedule.metric);
+    }
+
+    /**
+     * Public method for converting and reporting the Cassandra schema when 
triggered by a received API request
+     *
+     * @param metadata the {@link Metadata} to extract Cassandra schema from
+     */
+    public void processRequested(@NotNull Metadata metadata)
     {
-        process(cluster.getMetadata());
+        process(metadata, reportingMetrics.startedRequest.metric);
     }
 
     /**
-     * Public method for converting and reporting the Cassandra schema
+     * Private method for converting and reporting the Cassandra schema
      *
      * @param metadata the {@link Metadata} to extract Cassandra schema from
+     * @param started the {@link DeltaGauge} for the metric counting 
invocations
      */
-    public void process(@NotNull Metadata metadata)
+    private void process(@NotNull Metadata metadata,
+                         @NotNull DeltaGauge started)
     {
+        LOGGER.info("Starting to report schema for cluster, identifiers={}", 
identifiersProvider);
+        started.increment();
+
         try (Emitter emitter = emitterFactory.emitter())
         {
-            stream(metadata).forEach(ThrowableUtils.consumer(emitter::emit));
+            Timer.Context timer = reportingMetrics.totalDuration.metric.time();
+            long aspects = 
stream(metadata).map(ThrowableUtils.function(emitter::emit))
+                                           .count();
+
+            timer.close();
+            reportingMetrics.sizeAspects.metric.update(aspects);
+            reportingMetrics.finishedSuccess.metric.increment();
+            LOGGER.info("Successfully reported schema for cluster, 
identifiers={}", identifiersProvider);
         }
         catch (Exception exception)
         {
-            throw new RuntimeException("Cannot extract schema for cluster " + 
identifiersProvider.cluster(), exception);
+            reportingMetrics.finishedFailure.metric.increment();
+            LOGGER.error("Failed to report schema for cluster, 
identifiers={}", identifiersProvider);
+
+            throw new RuntimeException("Failed to report schema for cluster 
with identifiers " + identifiersProvider,
+                                       exception);
         }
     }
 
@@ -145,7 +187,6 @@ public class SchemaReporter
      * @return non-empty {@link Stream} of DataHub aspects
      */
     @NotNull
-    @SuppressWarnings("UnstableApiUsage")
     protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> 
stream(@NotNull Metadata metadata)
     {
         return Streams.concat(
@@ -165,7 +206,6 @@ public class SchemaReporter
      * @return non-empty {@link Stream} of DataHub aspects
      */
     @NotNull
-    @SuppressWarnings("UnstableApiUsage")
     protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> 
stream(@NotNull KeyspaceMetadata keyspace)
     {
         return Streams.concat(
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
index 00970c41..d91dd8fb 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
@@ -108,7 +108,7 @@ public class SchemaReportingTask implements PeriodicTask, 
ExecuteOnClusterLeaseh
     {
         try
         {
-            reporter.process(session.get().getCluster());
+            reporter.processScheduled(session.get().getCluster());
             LOGGER.info("Schema report has been completed successfully on 
attempt {}", attempt);
             promise.complete();
         }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
index 150ef07e..7a009b38 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
@@ -98,7 +98,7 @@ public class ReportSchemaHandler extends 
AbstractHandler<Void> implements Access
         Metadata metadata = 
metadataFetcher.callOnFirstAvailableInstance(instance -> 
instance.delegate().metadata());
 
         executorPools.service()
-                     .runBlocking(() -> schemaReporter.process(metadata))
+                     .runBlocking(() -> 
schemaReporter.processRequested(metadata))
                      .onSuccess(ignored -> context.json(OK_STATUS))
                      .onFailure(throwable -> processFailure(throwable, 
context, host, address, request));
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
index aae5bd66..b2ae14a6 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
@@ -33,7 +33,15 @@ public class DeltaGauge implements Gauge<Long>, Metric
 
     public DeltaGauge()
     {
-        this.count = new AtomicLong();
+        this.count = new AtomicLong(0L);
+    }
+
+    /**
+     * Increments the cumulative value tracked by this {@link DeltaGauge}
+     */
+    public void increment()
+    {
+        count.incrementAndGet();
     }
 
     /**
@@ -55,6 +63,6 @@ public class DeltaGauge implements Gauge<Long>, Metric
     @Override
     public Long getValue()
     {
-        return count.getAndSet(0);
+        return count.getAndSet(0L);
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
index 0fb32eb4..ea8d7857 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.metrics;
 
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
 
 import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX;
 
@@ -49,6 +50,11 @@ public interface ServerMetrics
      */
     SchemaMetrics schema();
 
+    /**
+     * @return metrics for the schema reporting
+     */
+    SchemaReportingMetrics schemaReporting();
+
     /**
      * @return metrics related to internal caches that are tracked.
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
index 75b0a248..15e31e6a 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.metrics;
 import java.util.Objects;
 
 import com.codahale.metrics.MetricRegistry;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
 
 /**
  * {@link ServerMetrics} tracks metrics related to Sidecar server.
@@ -32,6 +33,7 @@ public class ServerMetricsImpl implements ServerMetrics
     protected final ResourceMetrics resourceMetrics;
     protected final RestoreMetrics restoreMetrics;
     protected final SchemaMetrics schemaMetrics;
+    protected final SchemaReportingMetrics schemaReportingMetrics;
     protected final CacheMetrics cacheMetrics;
     protected final CoordinationMetrics coordinationMetrics;
 
@@ -43,6 +45,7 @@ public class ServerMetricsImpl implements ServerMetrics
         this.resourceMetrics = new ResourceMetrics(metricRegistry);
         this.restoreMetrics = new RestoreMetrics(metricRegistry);
         this.schemaMetrics = new SchemaMetrics(metricRegistry);
+        this.schemaReportingMetrics = new 
SchemaReportingMetrics(metricRegistry);
         this.cacheMetrics = new CacheMetrics(metricRegistry);
         this.coordinationMetrics = new CoordinationMetrics(metricRegistry);
     }
@@ -71,6 +74,12 @@ public class ServerMetricsImpl implements ServerMetrics
         return schemaMetrics;
     }
 
+    @Override
+    public SchemaReportingMetrics schemaReporting()
+    {
+        return schemaReportingMetrics;
+    }
+
     @Override
     public CacheMetrics cache()
     {
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
new file mode 100644
index 00000000..1b576ce5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.metrics.server;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.sidecar.metrics.DeltaGauge;
+import org.apache.cassandra.sidecar.metrics.NamedMetric;
+import org.apache.cassandra.sidecar.metrics.ServerMetrics;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Tracks metrics for the schema reporting done by Sidecar
+ */
+public class SchemaReportingMetrics
+{
+    protected static final String DOMAIN = ServerMetrics.SERVER_PREFIX + 
".SchemaReporting";
+
+    public final NamedMetric<DeltaGauge> startedRequest;
+    public final NamedMetric<DeltaGauge> startedSchedule;
+    public final NamedMetric<DeltaGauge> finishedSuccess;
+    public final NamedMetric<DeltaGauge> finishedFailure;
+    public final NamedMetric<Histogram> sizeAspects;
+    public final NamedMetric<Timer> totalDuration;
+
+    public SchemaReportingMetrics(@NotNull MetricRegistry registry)
+    {
+        startedSchedule = NamedMetric.builder(name -> registry.gauge(name, 
DeltaGauge::new))
+                                     .withDomain(DOMAIN)
+                                     .withName("Scheduled")
+                                     .build();
+        startedRequest = NamedMetric.builder(name -> registry.gauge(name, 
DeltaGauge::new))
+                                    .withDomain(DOMAIN)
+                                    .withName("Requested")
+                                    .build();
+
+        finishedSuccess = NamedMetric.builder(name -> registry.gauge(name, 
DeltaGauge::new))
+                                     .withDomain(DOMAIN)
+                                     .withName("Succeeded")
+                                     .build();
+        finishedFailure = NamedMetric.builder(name -> registry.gauge(name, 
DeltaGauge::new))
+                                     .withDomain(DOMAIN)
+                                     .withName("Failed")
+                                     .build();
+
+        sizeAspects = NamedMetric.builder(registry::histogram)
+                                 .withDomain(DOMAIN)
+                                 .withName("Aspects")
+                                 .build();
+
+        totalDuration = NamedMetric.builder(registry::timer)
+                                   .withDomain(DOMAIN)
+                                   .withName("Duration")
+                                   .build();
+    }
+}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
index 9650e87b..eaa4fdcb 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
@@ -21,12 +21,20 @@ package org.apache.cassandra.sidecar.datahub;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 
+import com.codahale.metrics.SharedMetricRegistries;
 import com.datastax.driver.core.Session;
 import com.linkedin.data.DataList;
 import com.linkedin.data.codec.JacksonDataCodec;
 import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 import org.jetbrains.annotations.NotNull;
@@ -41,6 +49,23 @@ final class SchemaReporterIntegrationTest extends 
IntegrationTestBase
 {
     private static final IdentifiersProvider IDENTIFIERS = new 
TestIdentifiers();
     private static final JacksonDataCodec CODEC = new JacksonDataCodec();
+    private static final MetricRegistryFactory FACTORY = new 
MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(),
+                                                                               
    Collections.emptyList(),
+                                                                               
    Collections.emptyList());
+
+    private SidecarMetrics metrics;
+
+    @BeforeEach
+    void beforeEach()
+    {
+        metrics = new SidecarMetricsImpl(FACTORY, null);
+    }
+
+    @AfterEach
+    void afterEach()
+    {
+        SharedMetricRegistries.clear();
+    }
 
     /**
      * Private helper method that removes all numeric suffixes added 
non-deterministically
@@ -97,18 +122,28 @@ final class SchemaReporterIntegrationTest extends 
IntegrationTestBase
         JsonEmitter emitter = new JsonEmitter();
         try (Session session = maybeGetSession())
         {
-            new SchemaReporter(IDENTIFIERS, () -> 
emitter).process(session.getCluster());
+            new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processScheduled(session.getCluster());
         }
-        String   actualJson = normalizeNames(emitter.content());
+        String actualJson = normalizeNames(emitter.content());
         String expectedJson = 
IOUtils.readFully("/datahub/integration_test.json");
-
         assertThat(actualJson).isEqualToNormalizingWhitespace(expectedJson);
 
-        // Finally, make sure the returned schema produces the same tree of
+        // Second, make sure the returned schema produces the same tree of
         // DataHub objects after having been normalized and deserialized
-        DataList   actualData = CODEC.readList(new StringReader(actualJson));
+        DataList actualData = CODEC.readList(new StringReader(actualJson));
         DataList expectedData = CODEC.readList(new StringReader(expectedJson));
-
         assertThat(actualData).isEqualTo(expectedData);
+        
+        // Third, validate the captured metrics: one execution triggered by 
the schedule and
+        // completed successfully, with thirteen aspects produced in zero or 
more milliseconds
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();
+        assertThat(metrics.startedRequest.metric.getValue()).isZero();
+        assertThat(metrics.startedSchedule.metric.getValue()).isOne();
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
     }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
new file mode 100644
index 00000000..949e79f4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.datahub;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+import org.jetbrains.annotations.NotNull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link IdentifiersProvider}
+ */
+final class IdentifiersProviderTest
+{
+    private static final IdentifiersProvider IDENTIFIERS = new 
TestIdentifiers();  // Single instance of immutable class
+
+    /**
+     * Tests the default values for the individual components of the composite 
identifier
+     */
+    @Test
+    void testValues()
+    {
+        String organization = "Cassandra";
+        String platform = "cassandra";
+        String environment = "ENVIRONMENT";
+        String application = "application";
+        String cluster = "cluster";
+
+        assertThat(IDENTIFIERS.organization()).isEqualTo(organization);
+        assertThat(IDENTIFIERS.platform()).isEqualTo(platform);
+        assertThat(IDENTIFIERS.environment()).isEqualTo(environment);
+        assertThat(IDENTIFIERS.application()).isEqualTo(application);
+        assertThat(IDENTIFIERS.cluster()).isEqualTo(cluster);
+    }
+
+    /**
+     * Tests the generation algorithm for the unique identifier of the 
composite identifier
+     */
+    @Test
+    void testIdentifier()
+    {
+        UUID identifier = 
UUID.fromString("ace3ba6b-49b2-3dd5-955a-1de13730188b");  // Calculated 
deterministically
+
+        assertThat(IDENTIFIERS.identifier()).isEqualTo(identifier);
+    }
+
+    /**
+     * Tests the generation formats for the URN identifiers of the composite 
identifier
+     */
+    @Test
+    void testUrns()
+    {
+        KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
+        TableMetadata table = mock(TableMetadata.class);
+        when(keyspace.getName()).thenReturn("keyspace");
+        when(table.getName()).thenReturn("table");
+        when(keyspace.getTables()).thenReturn(Collections.singleton(table));
+        when(table.getKeyspace()).thenReturn(keyspace);
+
+        String urnDataPlatform = "urn:li:dataPlatform:cassandra";
+        String urnDataPlatformInstance = "urn:li:dataPlatformInstance:" +
+                                         
"(urn:li:dataPlatform:cassandra,ace3ba6b-49b2-3dd5-955a-1de13730188b)";
+        String urnContainer = 
"urn:li:container:ace3ba6b-49b2-3dd5-955a-1de13730188b_keyspace";
+        String urnDataset = "urn:li:dataset:" +
+                            
"(urn:li:dataPlatform:cassandra,ace3ba6b-49b2-3dd5-955a-1de13730188b.keyspace.table,PROD)";
+
+        assertThat(IDENTIFIERS.urnDataPlatform()).isEqualTo(urnDataPlatform);
+        
assertThat(IDENTIFIERS.urnDataPlatformInstance()).isEqualTo(urnDataPlatformInstance);
+        assertThat(IDENTIFIERS.urnContainer(keyspace)).isEqualTo(urnContainer);
+        assertThat(IDENTIFIERS.urnDataset(table)).isEqualTo(urnDataset);
+    }
+
+    /**
+     * Tests the method implementations of {@link IdentifiersProvider#equals} 
and {@link IdentifiersProvider#hashCode}
+     */
+    @Test
+    void testEqualsAndHashCode()
+    {
+        IdentifiersProvider same = new IdentifiersProvider()
+        {
+            @Override
+            @NotNull
+            public String cluster()
+            {
+                return "cluster";
+            }
+        };
+        IdentifiersProvider different = new IdentifiersProvider()
+        {
+            @Override
+            @NotNull
+            public String cluster()
+            {
+                return "qwerty";
+            }
+        };
+
+        assertThat(IDENTIFIERS).isEqualTo(same).isNotEqualTo(different);
+        
assertThat(IDENTIFIERS).hasSameHashCodeAs(same).doesNotHaveSameHashCodeAs(different);
+    }
+
+    /**
+     * Tests the method implementation of {@link IdentifiersProvider#toString}
+     */
+    @Test
+    void testToString()
+    {
+        String string = "org.apache.cassandra.sidecar.datahub.TestIdentifiers" 
+
+                        
"(Cassandra,cassandra,ENVIRONMENT,application,cluster,ace3ba6b-49b2-3dd5-955a-1de13730188b)";
+
+        assertThat(IDENTIFIERS).hasToString(string);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
index 7a7b0f2f..387aecdc 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
@@ -22,8 +22,11 @@ package org.apache.cassandra.sidecar.datahub;
 import java.io.IOException;
 import java.util.Collections;
 import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.codahale.metrics.SharedMetricRegistries;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ColumnMetadata;
 import com.datastax.driver.core.DataType;
@@ -33,8 +36,12 @@ import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.TableOptionsMetadata;
 import com.datastax.driver.core.UserType;
 import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -45,56 +52,82 @@ import static org.mockito.Mockito.when;
 final class SchemaReporterTest
 {
     private static final IdentifiersProvider IDENTIFIERS = new 
TestIdentifiers();
+    private static final MetricRegistryFactory FACTORY = new 
MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(),
+                                                                               
    Collections.emptyList(),
+                                                                               
    Collections.emptyList());
+
+    private SidecarMetrics metrics;
+
+    @BeforeEach
+    void beforeEach()
+    {
+        metrics = new SidecarMetricsImpl(FACTORY, null);
+    }
+
+    @AfterEach
+    void afterEach()
+    {
+        SharedMetricRegistries.clear();
+    }
 
     @Test
     void testEmptyCluster() throws IOException
     {
-        Cluster cluster = mock(Cluster.class);
-        Metadata metadata = mock (Metadata.class);
-        when(cluster.getClusterName()).thenReturn("sample_cluster");
-        when(cluster.getMetadata()).thenReturn(metadata);
+        Metadata metadata = mock(Metadata.class);
         when(metadata.getKeyspaces()).thenReturn(Collections.emptyList());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processRequested(metadata);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_cluster.json");
+        assertThat(actual).isEqualTo(expected);
 
-        assertEquals(expected, actual);
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();                      // Validate 
captured metrics:
+        assertThat(metrics.startedRequest.metric.getValue()).isOne();          
                        //  * one execution triggered by request
+        assertThat(metrics.startedSchedule.metric.getValue()).isZero();        
                        //  * zero executions triggered by schedule
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();         
                        //  * one execution resulted in success
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();        
                        //  * zero executions resulted in failure
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();             
                        //  * single number of aspects,
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(2L);
          //    equal to two
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();           
                 //  * single duration of execution,
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
  //    that is non-negative
     }
 
     @Test
     void testEmptyKeyspace() throws IOException
     {
-        Cluster cluster = mock(Cluster.class);
-        Metadata metadata = mock (Metadata.class);
+        Metadata metadata = mock(Metadata.class);
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
-        when(cluster.getClusterName()).thenReturn("sample_cluster");
-        when(cluster.getMetadata()).thenReturn(metadata);
         
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
         when(keyspace.getName()).thenReturn("sample_keyspace");
         when(keyspace.getTables()).thenReturn(Collections.emptyList());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processRequested(metadata);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_keyspace.json");
+        assertThat(actual).isEqualTo(expected);
 
-        assertEquals(expected, actual);
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();                      // Validate 
captured metrics:
+        assertThat(metrics.startedRequest.metric.getValue()).isOne();          
                        //  * one execution triggered by request
+        assertThat(metrics.startedSchedule.metric.getValue()).isZero();        
                        //  * zero executions triggered by schedule
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();         
                        //  * one execution resulted in success
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();        
                        //  * zero executions resulted in failure
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();             
                        //  * single number of aspects,
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(6L);
          //    equal to six
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();           
                 //  * single duration of execution,
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
  //    that is non-negative
     }
 
     @Test
     void testEmptyTable() throws IOException
     {
-        Cluster cluster = mock(Cluster.class);
-        Metadata metadata = mock (Metadata.class);
+        Metadata metadata = mock(Metadata.class);
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
         TableMetadata table = mock(TableMetadata.class);
         TableOptionsMetadata options = mock(TableOptionsMetadata.class);
-        when(cluster.getClusterName()).thenReturn("sample_cluster");
-        when(cluster.getMetadata()).thenReturn(metadata);
         
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
         when(keyspace.getName()).thenReturn("sample_keyspace");
         
when(keyspace.getTables()).thenReturn(Collections.singletonList(table));
@@ -105,19 +138,28 @@ final class SchemaReporterTest
         when(options.getComment()).thenReturn("table comment");
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processRequested(metadata);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_table.json");
+        assertThat(actual).isEqualTo(expected);
 
-        assertEquals(expected, actual);
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();                      // Validate 
captured metrics:
+        assertThat(metrics.startedRequest.metric.getValue()).isOne();          
                        //  * one execution triggered by request
+        assertThat(metrics.startedSchedule.metric.getValue()).isZero();        
                        //  * zero executions triggered by schedule
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();         
                        //  * one execution resulted in success
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();        
                        //  * zero executions resulted in failure
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();             
                        //  * single number of aspects,
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
         //    equal to thirteen
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();           
                 //  * single duration of execution,
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
  //    that is non-negative
     }
 
     @Test
     void testPrimitiveTypes() throws IOException
     {
         Cluster cluster = mock(Cluster.class);
-        Metadata metadata = mock (Metadata.class);
+        Metadata metadata = mock(Metadata.class);
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
         TableMetadata table = mock(TableMetadata.class);
         TableOptionsMetadata options = mock(TableOptionsMetadata.class);
@@ -133,7 +175,6 @@ final class SchemaReporterTest
         ColumnMetadata c6 = mock(ColumnMetadata.class);
         ColumnMetadata c7 = mock(ColumnMetadata.class);
         ColumnMetadata c8 = mock(ColumnMetadata.class);
-        when(cluster.getClusterName()).thenReturn("sample_cluster");
         when(cluster.getMetadata()).thenReturn(metadata);
         
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
         when(keyspace.getName()).thenReturn("sample_keyspace");
@@ -184,19 +225,28 @@ final class SchemaReporterTest
         when(c8.getType()).thenReturn(DataType.map(DataType.timestamp(), 
DataType.inet(), false));
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processScheduled(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/primitive_types.json");
+        assertThat(actual).isEqualTo(expected);
 
-        assertEquals(expected, actual);
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();                      // Validate 
captured metrics:
+        assertThat(metrics.startedRequest.metric.getValue()).isZero();         
                        //  * zero executions triggered by request
+        assertThat(metrics.startedSchedule.metric.getValue()).isOne();         
                        //  * one execution triggered by schedule
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();         
                        //  * one execution resulted in success
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();        
                        //  * zero executions resulted in failure
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();             
                        //  * single number of aspects,
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
         //    equal to thirteen
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();           
                 //  * single duration of execution,
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
  //    that is non-negative
     }
 
     @Test
     void testUserTypes() throws IOException
     {
         Cluster cluster = mock(Cluster.class);
-        Metadata metadata = mock (Metadata.class);
+        Metadata metadata = mock(Metadata.class);
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
         TableMetadata table = mock(TableMetadata.class);
         TableOptionsMetadata options = mock(TableOptionsMetadata.class);
@@ -211,7 +261,6 @@ final class SchemaReporterTest
         UserType.Field udt1c1 = mock(UserType.Field.class);
         UserType.Field udt1udt2 = mock(UserType.Field.class);
         UserType.Field udt2c2 = mock(UserType.Field.class);
-        when(cluster.getClusterName()).thenReturn("sample_cluster");
         when(cluster.getMetadata()).thenReturn(metadata);
         
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
         when(keyspace.getName()).thenReturn("sample_keyspace");
@@ -257,11 +306,20 @@ final class SchemaReporterTest
         when(udt2c2.getType()).thenReturn(DataType.cboolean());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter, 
metrics).processScheduled(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/user_types.json");
+        assertThat(actual).isEqualTo(expected);
 
-        assertEquals(expected, actual);
+        SchemaReportingMetrics metrics = 
this.metrics.server().schemaReporting();                      // Validate 
captured metrics:
+        assertThat(metrics.startedRequest.metric.getValue()).isZero();         
                        //  * zero executions triggered by request
+        assertThat(metrics.startedSchedule.metric.getValue()).isOne();         
                        //  * one execution triggered by schedule
+        assertThat(metrics.finishedSuccess.metric.getValue()).isOne();         
                        //  * one execution resulted in success
+        assertThat(metrics.finishedFailure.metric.getValue()).isZero();        
                        //  * zero executions resulted in failure
+        assertThat(metrics.sizeAspects.metric.getCount()).isOne();             
                        //  * single number of aspects,
+        
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
         //    equal to thirteen
+        assertThat(metrics.totalDuration.metric.getCount()).isOne();           
                 //  * single duration of execution,
+        
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
  //    that is non-negative
     }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
index ab132684..398ebc00 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
@@ -27,6 +27,23 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 class DeltaGaugeTest
 {
+    @Test
+    void testCumulativeCountIncremented()
+    {
+        DeltaGauge deltaGauge = new DeltaGauge();
+
+        assertThat(deltaGauge.getValue()).isEqualTo(0L);
+        deltaGauge.increment();
+        assertThat(deltaGauge.getValue()).isEqualTo(1L);
+        assertThat(deltaGauge.getValue()).isEqualTo(0L);
+        deltaGauge.increment();
+        deltaGauge.increment();
+        deltaGauge.increment();
+        assertThat(deltaGauge.getValue()).isEqualTo(3L);
+        assertThat(deltaGauge.getValue()).isEqualTo(0L);
+        assertThat(deltaGauge.getValue()).isEqualTo(0L);
+    }
+
     @Test
     void testCumulativeCountUpdated()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to