This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c2af095 CASSSIDECAR-216: Capture Metrics for Schema Reporting (#204)
2c2af095 is described below
commit 2c2af0954472b2dc83e52954bd4fa745659b804c
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Tue Mar 18 11:52:58 2025 -0500
CASSSIDECAR-216: Capture Metrics for Schema Reporting (#204)
Patch by Yuriy Semchyshyn; Reviewed by Francisco Guerrero, Saranya
Krishnakumar, Yifan Cai for CASSSIDECAR-216
---
CHANGES.txt | 1 +
.../sidecar/datahub/IdentifiersProvider.java | 34 +++++-
.../cassandra/sidecar/datahub/SchemaReporter.java | 64 ++++++++--
.../sidecar/datahub/SchemaReportingTask.java | 2 +-
.../sidecar/handlers/ReportSchemaHandler.java | 2 +-
.../cassandra/sidecar/metrics/DeltaGauge.java | 12 +-
.../cassandra/sidecar/metrics/ServerMetrics.java | 6 +
.../sidecar/metrics/ServerMetricsImpl.java | 9 ++
.../metrics/server/SchemaReportingMetrics.java | 73 +++++++++++
.../datahub/SchemaReporterIntegrationTest.java | 47 ++++++-
.../sidecar/datahub/IdentifiersProviderTest.java | 136 +++++++++++++++++++++
.../sidecar/datahub/SchemaReporterTest.java | 112 +++++++++++++----
.../cassandra/sidecar/metrics/DeltaGaugeTest.java | 17 +++
13 files changed, 462 insertions(+), 53 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 90497fa8..baef5ee7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Capture Metrics for Schema Reporting (CASSSIDECAR-216)
* SidecarInstanceCodec is failing to find codec for type (CASSSIDECAR-229)
* Retry Failed Schema Reports (CASSSIDECAR-217)
* Guice modularization (CASSSIDECAR-208)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
index e90f4ca4..189d5189 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.TableMetadata;
@@ -32,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
/**
* An abstract class that has to be extended and instantiated for every
Cassandra
- * cluster that needs its schema converted into a DataHub-compliant format.
+ * cluster that needs its schema converted into a DataHub-compliant format
*/
public abstract class IdentifiersProvider
{
@@ -42,7 +43,26 @@ public abstract class IdentifiersProvider
protected static final String DATA_PLATFORM_INSTANCE =
"dataPlatformInstance";
protected static final String CONTAINER = "container";
protected static final String DATASET = "dataset";
- protected static final String PROD = "PROD"; // DataHub requires this to
be {@code PROD} regardless
+ protected static final String PROD = "PROD"; // DataHub requires this to
be {@code PROD}
+
+ protected static final ToStringStyle STYLE = new ToStringStyle()
+ {{
+ setUseShortClassName(false);
+ setUseClassName(true);
+ setUseIdentityHashCode(false);
+ setUseFieldNames(false);
+ setContentStart("(");
+ setFieldSeparatorAtStart(false);
+ setFieldSeparator(",");
+ setFieldSeparatorAtEnd(false);
+ setContentEnd(")");
+ setDefaultFullDetail(true);
+ setArrayContentDetail(true);
+ setArrayStart("(");
+ setArraySeparator(",");
+ setArrayEnd(")");
+ setNullText("null");
+ }};
/**
* A public getter method that returns the name of Cassandra Organization
@@ -231,13 +251,19 @@ public abstract class IdentifiersProvider
@NotNull
public String toString()
{
- return new ToStringBuilder(this)
+ return new ToStringBuilder(this, STYLE)
.append(this.organization())
.append(this.platform())
.append(this.environment())
.append(this.application())
.append(this.cluster())
.append(this.identifier())
- .toString();
+ .toString()
+ .replaceAll("\\s", "");
+
+ // Use of a custom {@link ToStringStyle} implementation prevents the
hash code from being
+ // included into the {@link String} representation; which, in
conjunction with the removal
+ // of all whitespace characters, simplifies extraction of {@link
IdentifierProvider}
+ // objects from the execution logs; without negatively affecting the
readability much
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
index 4ad3e3f3..8e8d7fcc 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
@@ -23,7 +23,10 @@ import java.util.List;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Timer;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
@@ -34,6 +37,9 @@ import com.linkedin.data.template.RecordTemplate;
import datahub.client.Emitter;
import datahub.event.MetadataChangeProposalWrapper;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+import org.apache.cassandra.sidecar.metrics.DeltaGauge;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
import org.jetbrains.annotations.NotNull;
/**
@@ -47,6 +53,8 @@ import org.jetbrains.annotations.NotNull;
@Singleton
public class SchemaReporter
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaReporter.class);
+
@NotNull
protected final IdentifiersProvider identifiersProvider;
@NotNull
@@ -57,6 +65,8 @@ public class SchemaReporter
protected final List<TableToAspectConverter<? extends RecordTemplate>>
tableConverters;
@NotNull
protected final EmitterFactory emitterFactory;
+ @NotNull
+ protected final SchemaReportingMetrics reportingMetrics;
/**
* The public constructor that instantiates {@link SchemaReporter} with
default configuration.
@@ -66,10 +76,12 @@ public class SchemaReporter
*
* @param identifiersProvider an instance of {@link IdentifiersProvider}
to use
* @param emitterFactory an instance of {@link EmitterFactory} to use
+ * @param sidecarMetrics an instance of {@link SidecarMetrics} to obtain
{@link SchemaReportingMetrics} from
*/
@Inject
public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
- @NotNull EmitterFactory emitterFactory)
+ @NotNull EmitterFactory emitterFactory,
+ @NotNull SidecarMetrics sidecarMetrics)
{
this(identifiersProvider,
ImmutableList.of(new
ClusterToDataPlatformInfoConverter(identifiersProvider),
@@ -85,7 +97,8 @@ public class SchemaReporter
new
TableToDataPlatformInstanceConverter(identifiersProvider),
new
TableToBrowsePathsV2Converter(identifiersProvider),
new
TableToBrowsePathsConverter(identifiersProvider)),
- emitterFactory);
+ emitterFactory,
+ sidecarMetrics.server().schemaReporting());
}
/**
@@ -96,44 +109,73 @@ public class SchemaReporter
* @param keyspaceConverters a {@link List} of {@link
KeyspaceToAspectConverter} instances to use
* @param tableConverters a {@link List} of {@link TableToAspectConverter}
instances to use
* @param emitterFactory an instance of {@link EmitterFactory} to use
+ * @param reportingMetrics an instance of {@link SchemaReportingMetrics}
to use
*/
protected SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
@NotNull List<ClusterToAspectConverter<? extends
RecordTemplate>> clusterConverters,
@NotNull List<KeyspaceToAspectConverter<? extends
RecordTemplate>> keyspaceConverters,
@NotNull List<TableToAspectConverter<? extends
RecordTemplate>> tableConverters,
- @NotNull EmitterFactory emitterFactory)
+ @NotNull EmitterFactory emitterFactory,
+ @NotNull SchemaReportingMetrics reportingMetrics)
{
this.identifiersProvider = identifiersProvider;
this.clusterConverters = clusterConverters;
this.keyspaceConverters = keyspaceConverters;
this.tableConverters = tableConverters;
this.emitterFactory = emitterFactory;
+ this.reportingMetrics = reportingMetrics;
}
/**
- * Public method for converting and reporting the Cassandra schema
+ * Public method for converting and reporting the Cassandra schema when
triggered by a scheduled periodic task
*
* @param cluster the {@link Cluster} to extract Cassandra schema from
*/
- public void process(@NotNull Cluster cluster)
+ public void processScheduled(@NotNull Cluster cluster)
+ {
+ process(cluster.getMetadata(),
reportingMetrics.startedSchedule.metric);
+ }
+
+ /**
+ * Public method for converting and reporting the Cassandra schema when
triggered by a received API request
+ *
+ * @param metadata the {@link Metadata} to extract Cassandra schema from
+ */
+ public void processRequested(@NotNull Metadata metadata)
{
- process(cluster.getMetadata());
+ process(metadata, reportingMetrics.startedRequest.metric);
}
/**
- * Public method for converting and reporting the Cassandra schema
+ * Private method for converting and reporting the Cassandra schema
*
* @param metadata the {@link Metadata} to extract Cassandra schema from
+ * @param started the {@link DeltaGauge} for the metric counting
invocations
*/
- public void process(@NotNull Metadata metadata)
+ private void process(@NotNull Metadata metadata,
+ @NotNull DeltaGauge started)
{
+ LOGGER.info("Starting to report schema for cluster, identifiers={}",
identifiersProvider);
+ started.increment();
+
try (Emitter emitter = emitterFactory.emitter())
{
- stream(metadata).forEach(ThrowableUtils.consumer(emitter::emit));
+ Timer.Context timer = reportingMetrics.totalDuration.metric.time();
+ long aspects =
stream(metadata).map(ThrowableUtils.function(emitter::emit))
+ .count();
+
+ timer.close();
+ reportingMetrics.sizeAspects.metric.update(aspects);
+ reportingMetrics.finishedSuccess.metric.increment();
+ LOGGER.info("Successfully reported schema for cluster,
identifiers={}", identifiersProvider);
}
catch (Exception exception)
{
- throw new RuntimeException("Cannot extract schema for cluster " +
identifiersProvider.cluster(), exception);
+ reportingMetrics.finishedFailure.metric.increment();
+ LOGGER.error("Failed to report schema for cluster,
identifiers={}", identifiersProvider);
+
+ throw new RuntimeException("Failed to report schema for cluster
with identifiers " + identifiersProvider,
+ exception);
}
}
@@ -145,7 +187,6 @@ public class SchemaReporter
* @return non-empty {@link Stream} of DataHub aspects
*/
@NotNull
- @SuppressWarnings("UnstableApiUsage")
protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>>
stream(@NotNull Metadata metadata)
{
return Streams.concat(
@@ -165,7 +206,6 @@ public class SchemaReporter
* @return non-empty {@link Stream} of DataHub aspects
*/
@NotNull
- @SuppressWarnings("UnstableApiUsage")
protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>>
stream(@NotNull KeyspaceMetadata keyspace)
{
return Streams.concat(
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
index 00970c41..d91dd8fb 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java
@@ -108,7 +108,7 @@ public class SchemaReportingTask implements PeriodicTask,
ExecuteOnClusterLeaseh
{
try
{
- reporter.process(session.get().getCluster());
+ reporter.processScheduled(session.get().getCluster());
LOGGER.info("Schema report has been completed successfully on
attempt {}", attempt);
promise.complete();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
index 150ef07e..7a009b38 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ReportSchemaHandler.java
@@ -98,7 +98,7 @@ public class ReportSchemaHandler extends
AbstractHandler<Void> implements Access
Metadata metadata =
metadataFetcher.callOnFirstAvailableInstance(instance ->
instance.delegate().metadata());
executorPools.service()
- .runBlocking(() -> schemaReporter.process(metadata))
+ .runBlocking(() ->
schemaReporter.processRequested(metadata))
.onSuccess(ignored -> context.json(OK_STATUS))
.onFailure(throwable -> processFailure(throwable,
context, host, address, request));
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
index aae5bd66..b2ae14a6 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java
@@ -33,7 +33,15 @@ public class DeltaGauge implements Gauge<Long>, Metric
public DeltaGauge()
{
- this.count = new AtomicLong();
+ this.count = new AtomicLong(0L);
+ }
+
+ /**
+ * Increments the cumulative value tracked by this {@link DeltaGauge}
+ */
+ public void increment()
+ {
+ count.incrementAndGet();
}
/**
@@ -55,6 +63,6 @@ public class DeltaGauge implements Gauge<Long>, Metric
@Override
public Long getValue()
{
- return count.getAndSet(0);
+ return count.getAndSet(0L);
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
index 0fb32eb4..ea8d7857 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.metrics;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX;
@@ -49,6 +50,11 @@ public interface ServerMetrics
*/
SchemaMetrics schema();
+ /**
+ * @return metrics for the schema reporting
+ */
+ SchemaReportingMetrics schemaReporting();
+
/**
* @return metrics related to internal caches that are tracked.
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
index 75b0a248..15e31e6a 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.metrics;
import java.util.Objects;
import com.codahale.metrics.MetricRegistry;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
/**
* {@link ServerMetrics} tracks metrics related to Sidecar server.
@@ -32,6 +33,7 @@ public class ServerMetricsImpl implements ServerMetrics
protected final ResourceMetrics resourceMetrics;
protected final RestoreMetrics restoreMetrics;
protected final SchemaMetrics schemaMetrics;
+ protected final SchemaReportingMetrics schemaReportingMetrics;
protected final CacheMetrics cacheMetrics;
protected final CoordinationMetrics coordinationMetrics;
@@ -43,6 +45,7 @@ public class ServerMetricsImpl implements ServerMetrics
this.resourceMetrics = new ResourceMetrics(metricRegistry);
this.restoreMetrics = new RestoreMetrics(metricRegistry);
this.schemaMetrics = new SchemaMetrics(metricRegistry);
+ this.schemaReportingMetrics = new
SchemaReportingMetrics(metricRegistry);
this.cacheMetrics = new CacheMetrics(metricRegistry);
this.coordinationMetrics = new CoordinationMetrics(metricRegistry);
}
@@ -71,6 +74,12 @@ public class ServerMetricsImpl implements ServerMetrics
return schemaMetrics;
}
+ @Override
+ public SchemaReportingMetrics schemaReporting()
+ {
+ return schemaReportingMetrics;
+ }
+
@Override
public CacheMetrics cache()
{
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
new file mode 100644
index 00000000..1b576ce5
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.metrics.server;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.sidecar.metrics.DeltaGauge;
+import org.apache.cassandra.sidecar.metrics.NamedMetric;
+import org.apache.cassandra.sidecar.metrics.ServerMetrics;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Tracks metrics for the schema reporting done by Sidecar
+ */
+public class SchemaReportingMetrics
+{
+ protected static final String DOMAIN = ServerMetrics.SERVER_PREFIX +
".SchemaReporting";
+
+ public final NamedMetric<DeltaGauge> startedRequest;
+ public final NamedMetric<DeltaGauge> startedSchedule;
+ public final NamedMetric<DeltaGauge> finishedSuccess;
+ public final NamedMetric<DeltaGauge> finishedFailure;
+ public final NamedMetric<Histogram> sizeAspects;
+ public final NamedMetric<Timer> totalDuration;
+
+ public SchemaReportingMetrics(@NotNull MetricRegistry registry)
+ {
+ startedSchedule = NamedMetric.builder(name -> registry.gauge(name,
DeltaGauge::new))
+ .withDomain(DOMAIN)
+ .withName("Scheduled")
+ .build();
+ startedRequest = NamedMetric.builder(name -> registry.gauge(name,
DeltaGauge::new))
+ .withDomain(DOMAIN)
+ .withName("Requested")
+ .build();
+
+ finishedSuccess = NamedMetric.builder(name -> registry.gauge(name,
DeltaGauge::new))
+ .withDomain(DOMAIN)
+ .withName("Succeeded")
+ .build();
+ finishedFailure = NamedMetric.builder(name -> registry.gauge(name,
DeltaGauge::new))
+ .withDomain(DOMAIN)
+ .withName("Failed")
+ .build();
+
+ sizeAspects = NamedMetric.builder(registry::histogram)
+ .withDomain(DOMAIN)
+ .withName("Aspects")
+ .build();
+
+ totalDuration = NamedMetric.builder(registry::timer)
+ .withDomain(DOMAIN)
+ .withName("Duration")
+ .build();
+ }
+}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
index 9650e87b..eaa4fdcb 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
@@ -21,12 +21,20 @@ package org.apache.cassandra.sidecar.datahub;
import java.io.IOException;
import java.io.StringReader;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import com.codahale.metrics.SharedMetricRegistries;
import com.datastax.driver.core.Session;
import com.linkedin.data.DataList;
import com.linkedin.data.codec.JacksonDataCodec;
import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
import org.apache.cassandra.testing.CassandraIntegrationTest;
import org.jetbrains.annotations.NotNull;
@@ -41,6 +49,23 @@ final class SchemaReporterIntegrationTest extends
IntegrationTestBase
{
private static final IdentifiersProvider IDENTIFIERS = new
TestIdentifiers();
private static final JacksonDataCodec CODEC = new JacksonDataCodec();
+ private static final MetricRegistryFactory FACTORY = new
MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(),
+
Collections.emptyList(),
+
Collections.emptyList());
+
+ private SidecarMetrics metrics;
+
+ @BeforeEach
+ void beforeEach()
+ {
+ metrics = new SidecarMetricsImpl(FACTORY, null);
+ }
+
+ @AfterEach
+ void afterEach()
+ {
+ SharedMetricRegistries.clear();
+ }
/**
* Private helper method that removes all numeric suffixes added
non-deterministically
@@ -97,18 +122,28 @@ final class SchemaReporterIntegrationTest extends
IntegrationTestBase
JsonEmitter emitter = new JsonEmitter();
try (Session session = maybeGetSession())
{
- new SchemaReporter(IDENTIFIERS, () ->
emitter).process(session.getCluster());
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processScheduled(session.getCluster());
}
- String actualJson = normalizeNames(emitter.content());
+ String actualJson = normalizeNames(emitter.content());
String expectedJson =
IOUtils.readFully("/datahub/integration_test.json");
-
assertThat(actualJson).isEqualToNormalizingWhitespace(expectedJson);
- // Finally, make sure the returned schema produces the same tree of
+ // Second, make sure the returned schema produces the same tree of
// DataHub objects after having been normalized and deserialized
- DataList actualData = CODEC.readList(new StringReader(actualJson));
+ DataList actualData = CODEC.readList(new StringReader(actualJson));
DataList expectedData = CODEC.readList(new StringReader(expectedJson));
-
assertThat(actualData).isEqualTo(expectedData);
+
+ // Third, validate the captured metrics: one execution triggered by
the schedule and
+ // completed successfully, with thirteen aspects produced in zero or
more milliseconds
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting();
+ assertThat(metrics.startedRequest.metric.getValue()).isZero();
+ assertThat(metrics.startedSchedule.metric.getValue()).isOne();
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
new file mode 100644
index 00000000..949e79f4
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/IdentifiersProviderTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.datahub;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+import org.jetbrains.annotations.NotNull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link IdentifiersProvider}
+ */
+final class IdentifiersProviderTest
+{
+ private static final IdentifiersProvider IDENTIFIERS = new
TestIdentifiers(); // Single instance of immutable class
+
+ /**
+ * Tests the default values for the individual components of the composite
identifier
+ */
+ @Test
+ void testValues()
+ {
+ String organization = "Cassandra";
+ String platform = "cassandra";
+ String environment = "ENVIRONMENT";
+ String application = "application";
+ String cluster = "cluster";
+
+ assertThat(IDENTIFIERS.organization()).isEqualTo(organization);
+ assertThat(IDENTIFIERS.platform()).isEqualTo(platform);
+ assertThat(IDENTIFIERS.environment()).isEqualTo(environment);
+ assertThat(IDENTIFIERS.application()).isEqualTo(application);
+ assertThat(IDENTIFIERS.cluster()).isEqualTo(cluster);
+ }
+
+ /**
+ * Tests the generation algorithm for the unique identifier of the
composite identifier
+ */
+ @Test
+ void testIdentifier()
+ {
+ UUID identifier =
UUID.fromString("ace3ba6b-49b2-3dd5-955a-1de13730188b"); // Calculated
deterministically
+
+ assertThat(IDENTIFIERS.identifier()).isEqualTo(identifier);
+ }
+
+ /**
+ * Tests the generation formats for the URN identifiers of the composite
identifier
+ */
+ @Test
+ void testUrns()
+ {
+ KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
+ TableMetadata table = mock(TableMetadata.class);
+ when(keyspace.getName()).thenReturn("keyspace");
+ when(table.getName()).thenReturn("table");
+ when(keyspace.getTables()).thenReturn(Collections.singleton(table));
+ when(table.getKeyspace()).thenReturn(keyspace);
+
+ String urnDataPlatform = "urn:li:dataPlatform:cassandra";
+ String urnDataPlatformInstance = "urn:li:dataPlatformInstance:" +
+
"(urn:li:dataPlatform:cassandra,ace3ba6b-49b2-3dd5-955a-1de13730188b)";
+ String urnContainer =
"urn:li:container:ace3ba6b-49b2-3dd5-955a-1de13730188b_keyspace";
+ String urnDataset = "urn:li:dataset:" +
+
"(urn:li:dataPlatform:cassandra,ace3ba6b-49b2-3dd5-955a-1de13730188b.keyspace.table,PROD)";
+
+ assertThat(IDENTIFIERS.urnDataPlatform()).isEqualTo(urnDataPlatform);
+
assertThat(IDENTIFIERS.urnDataPlatformInstance()).isEqualTo(urnDataPlatformInstance);
+ assertThat(IDENTIFIERS.urnContainer(keyspace)).isEqualTo(urnContainer);
+ assertThat(IDENTIFIERS.urnDataset(table)).isEqualTo(urnDataset);
+ }
+
+ /**
+ * Tests the method implementations of {@link IdentifiersProvider#equals}
and {@link IdentifiersProvider#hashCode}
+ */
+ @Test
+ void testEqualsAndHashCode()
+ {
+ IdentifiersProvider same = new IdentifiersProvider()
+ {
+ @Override
+ @NotNull
+ public String cluster()
+ {
+ return "cluster";
+ }
+ };
+ IdentifiersProvider different = new IdentifiersProvider()
+ {
+ @Override
+ @NotNull
+ public String cluster()
+ {
+ return "qwerty";
+ }
+ };
+
+ assertThat(IDENTIFIERS).isEqualTo(same).isNotEqualTo(different);
+
assertThat(IDENTIFIERS).hasSameHashCodeAs(same).doesNotHaveSameHashCodeAs(different);
+ }
+
+ /**
+ * Tests the method implementation of {@link IdentifiersProvider#toString}
+ */
+ @Test
+ void testToString()
+ {
+ String string = "org.apache.cassandra.sidecar.datahub.TestIdentifiers"
+
+
"(Cassandra,cassandra,ENVIRONMENT,application,cluster,ace3ba6b-49b2-3dd5-955a-1de13730188b)";
+
+ assertThat(IDENTIFIERS).hasToString(string);
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
index 7a7b0f2f..387aecdc 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
@@ -22,8 +22,11 @@ package org.apache.cassandra.sidecar.datahub;
import java.io.IOException;
import java.util.Collections;
import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import com.codahale.metrics.SharedMetricRegistries;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
@@ -33,8 +36,12 @@ import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.TableOptionsMetadata;
import com.datastax.driver.core.UserType;
import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
+import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,56 +52,82 @@ import static org.mockito.Mockito.when;
final class SchemaReporterTest
{
private static final IdentifiersProvider IDENTIFIERS = new
TestIdentifiers();
+ private static final MetricRegistryFactory FACTORY = new
MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(),
+
Collections.emptyList(),
+
Collections.emptyList());
+
+ private SidecarMetrics metrics;
+
+ @BeforeEach
+ void beforeEach()
+ {
+ metrics = new SidecarMetricsImpl(FACTORY, null);
+ }
+
+ @AfterEach
+ void afterEach()
+ {
+ SharedMetricRegistries.clear();
+ }
@Test
void testEmptyCluster() throws IOException
{
- Cluster cluster = mock(Cluster.class);
- Metadata metadata = mock (Metadata.class);
- when(cluster.getClusterName()).thenReturn("sample_cluster");
- when(cluster.getMetadata()).thenReturn(metadata);
+ Metadata metadata = mock(Metadata.class);
when(metadata.getKeyspaces()).thenReturn(Collections.emptyList());
JsonEmitter emitter = new JsonEmitter();
- new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processRequested(metadata);
String actual = emitter.content();
String expected = IOUtils.readFully("/datahub/empty_cluster.json");
+ assertThat(actual).isEqualTo(expected);
- assertEquals(expected, actual);
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting(); // Validate
captured metrics:
+ assertThat(metrics.startedRequest.metric.getValue()).isOne();
// * one execution triggered by request
+ assertThat(metrics.startedSchedule.metric.getValue()).isZero();
// * zero executions triggered by schedule
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
// * one execution resulted in success
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
// * zero executions resulted in failure
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
// * single number of aspects,
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(2L);
// equal to two
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
// * single duration of execution,
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
// that is non-negative
}
@Test
void testEmptyKeyspace() throws IOException
{
- Cluster cluster = mock(Cluster.class);
- Metadata metadata = mock (Metadata.class);
+ Metadata metadata = mock(Metadata.class);
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
- when(cluster.getClusterName()).thenReturn("sample_cluster");
- when(cluster.getMetadata()).thenReturn(metadata);
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
when(keyspace.getName()).thenReturn("sample_keyspace");
when(keyspace.getTables()).thenReturn(Collections.emptyList());
JsonEmitter emitter = new JsonEmitter();
- new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processRequested(metadata);
String actual = emitter.content();
String expected = IOUtils.readFully("/datahub/empty_keyspace.json");
+ assertThat(actual).isEqualTo(expected);
- assertEquals(expected, actual);
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting(); // Validate
captured metrics:
+ assertThat(metrics.startedRequest.metric.getValue()).isOne();
// * one execution triggered by request
+ assertThat(metrics.startedSchedule.metric.getValue()).isZero();
// * zero executions triggered by schedule
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
// * one execution resulted in success
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
// * zero executions resulted in failure
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
// * single number of aspects,
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(6L);
// equal to six
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
// * single duration of execution,
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
// that is non-negative
}
@Test
void testEmptyTable() throws IOException
{
- Cluster cluster = mock(Cluster.class);
- Metadata metadata = mock (Metadata.class);
+ Metadata metadata = mock(Metadata.class);
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
TableMetadata table = mock(TableMetadata.class);
TableOptionsMetadata options = mock(TableOptionsMetadata.class);
- when(cluster.getClusterName()).thenReturn("sample_cluster");
- when(cluster.getMetadata()).thenReturn(metadata);
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
when(keyspace.getName()).thenReturn("sample_keyspace");
when(keyspace.getTables()).thenReturn(Collections.singletonList(table));
@@ -105,19 +138,28 @@ final class SchemaReporterTest
when(options.getComment()).thenReturn("table comment");
JsonEmitter emitter = new JsonEmitter();
- new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processRequested(metadata);
String actual = emitter.content();
String expected = IOUtils.readFully("/datahub/empty_table.json");
+ assertThat(actual).isEqualTo(expected);
- assertEquals(expected, actual);
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting(); // Validate
captured metrics:
+ assertThat(metrics.startedRequest.metric.getValue()).isOne();
// * one execution triggered by request
+ assertThat(metrics.startedSchedule.metric.getValue()).isZero();
// * zero executions triggered by schedule
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
// * one execution resulted in success
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
// * zero executions resulted in failure
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
// * single number of aspects,
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
// equal to thirteen
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
// * single duration of execution,
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
// that is non-negative
}
@Test
void testPrimitiveTypes() throws IOException
{
Cluster cluster = mock(Cluster.class);
- Metadata metadata = mock (Metadata.class);
+ Metadata metadata = mock(Metadata.class);
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
TableMetadata table = mock(TableMetadata.class);
TableOptionsMetadata options = mock(TableOptionsMetadata.class);
@@ -133,7 +175,6 @@ final class SchemaReporterTest
ColumnMetadata c6 = mock(ColumnMetadata.class);
ColumnMetadata c7 = mock(ColumnMetadata.class);
ColumnMetadata c8 = mock(ColumnMetadata.class);
- when(cluster.getClusterName()).thenReturn("sample_cluster");
when(cluster.getMetadata()).thenReturn(metadata);
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
when(keyspace.getName()).thenReturn("sample_keyspace");
@@ -184,19 +225,28 @@ final class SchemaReporterTest
when(c8.getType()).thenReturn(DataType.map(DataType.timestamp(),
DataType.inet(), false));
JsonEmitter emitter = new JsonEmitter();
- new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processScheduled(cluster);
String actual = emitter.content();
String expected = IOUtils.readFully("/datahub/primitive_types.json");
+ assertThat(actual).isEqualTo(expected);
- assertEquals(expected, actual);
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting(); // Validate
captured metrics:
+ assertThat(metrics.startedRequest.metric.getValue()).isZero();
// * zero executions triggered by request
+ assertThat(metrics.startedSchedule.metric.getValue()).isOne();
// * one execution triggered by schedule
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
// * one execution resulted in success
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
// * zero executions resulted in failure
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
// * single number of aspects,
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
// equal to thirteen
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
// * single duration of execution,
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
// that is non-negative
}
@Test
void testUserTypes() throws IOException
{
Cluster cluster = mock(Cluster.class);
- Metadata metadata = mock (Metadata.class);
+ Metadata metadata = mock(Metadata.class);
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
TableMetadata table = mock(TableMetadata.class);
TableOptionsMetadata options = mock(TableOptionsMetadata.class);
@@ -211,7 +261,6 @@ final class SchemaReporterTest
UserType.Field udt1c1 = mock(UserType.Field.class);
UserType.Field udt1udt2 = mock(UserType.Field.class);
UserType.Field udt2c2 = mock(UserType.Field.class);
- when(cluster.getClusterName()).thenReturn("sample_cluster");
when(cluster.getMetadata()).thenReturn(metadata);
when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace));
when(keyspace.getName()).thenReturn("sample_keyspace");
@@ -257,11 +306,20 @@ final class SchemaReporterTest
when(udt2c2.getType()).thenReturn(DataType.cboolean());
JsonEmitter emitter = new JsonEmitter();
- new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
+ new SchemaReporter(IDENTIFIERS, () -> emitter,
metrics).processScheduled(cluster);
String actual = emitter.content();
String expected = IOUtils.readFully("/datahub/user_types.json");
+ assertThat(actual).isEqualTo(expected);
- assertEquals(expected, actual);
+ SchemaReportingMetrics metrics =
this.metrics.server().schemaReporting(); // Validate
captured metrics:
+ assertThat(metrics.startedRequest.metric.getValue()).isZero();
// * zero executions triggered by request
+ assertThat(metrics.startedSchedule.metric.getValue()).isOne();
// * one execution triggered by schedule
+ assertThat(metrics.finishedSuccess.metric.getValue()).isOne();
// * one execution resulted in success
+ assertThat(metrics.finishedFailure.metric.getValue()).isZero();
// * zero executions resulted in failure
+ assertThat(metrics.sizeAspects.metric.getCount()).isOne();
// * single number of aspects,
+
assertThat(metrics.sizeAspects.metric.getSnapshot().getValues()).containsExactly(13L);
// equal to thirteen
+ assertThat(metrics.totalDuration.metric.getCount()).isOne();
// * single duration of execution,
+
assertThat(metrics.totalDuration.metric.getSnapshot().getValues()[0]).isNotNegative();
// that is non-negative
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
index ab132684..398ebc00 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java
@@ -27,6 +27,23 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
class DeltaGaugeTest
{
+ @Test
+ void testCumulativeCountIncremented()
+ {
+ DeltaGauge deltaGauge = new DeltaGauge();
+
+ assertThat(deltaGauge.getValue()).isEqualTo(0L);
+ deltaGauge.increment();
+ assertThat(deltaGauge.getValue()).isEqualTo(1L);
+ assertThat(deltaGauge.getValue()).isEqualTo(0L);
+ deltaGauge.increment();
+ deltaGauge.increment();
+ deltaGauge.increment();
+ assertThat(deltaGauge.getValue()).isEqualTo(3L);
+ assertThat(deltaGauge.getValue()).isEqualTo(0L);
+ assertThat(deltaGauge.getValue()).isEqualTo(0L);
+ }
+
@Test
void testCumulativeCountUpdated()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]