This is an automated email from the ASF dual-hosted git repository.
lukasz-antoniak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff0b90f61 CASSSIDECAR-443: Support column types not parseable by Java
3.x driver (#340)
ff0b90f61 is described below
commit ff0b90f6115721699b78f5401720fac6429df519
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Fri May 8 08:34:22 2026 +0200
CASSSIDECAR-443: Support column types not parseable by Java 3.x driver
(#340)
Patch by Lukasz Antoniak; Reviewed by Yifan Cai and Shailaja Koppu for
CASSSIDECAR-443
---
CHANGES.txt | 1 +
.../CassandraVectorSchemaRouteIntegrationTest.java | 70 +++++
.../common/server/data/QualifiedTableName.java | 21 ++
.../cassandra/bridge/CassandraBridgeFactory.java | 1 -
.../cluster/locator/CachedLocalTokenRanges.java | 2 +-
.../sidecar/config/DriverConfiguration.java | 8 +
.../config/yaml/DriverConfigurationImpl.java | 23 +-
.../cassandra/sidecar/db/CQLSchemaAccessor.java | 98 +++++++
.../sidecar/db/DriverUnsupportedSchemaCache.java | 293 +++++++++++++++++++++
.../sidecar/handlers/KeyspaceSchemaHandler.java | 53 +++-
.../cassandra/sidecar/handlers/SchemaHandler.java | 7 +-
.../sstableuploads/SSTableUploadHandler.java | 63 +++--
.../validations/ValidateTableExistenceHandler.java | 39 ++-
.../cassandra/sidecar/modules/CdcModule.java | 4 +-
.../sidecar/modules/ConfigurationModule.java | 31 +++
.../modules/multibindings/PeriodicTaskMapKeys.java | 1 +
.../tasks/CassandraClusterSchemaMonitor.java | 15 +-
.../SSTableVectorUploadHandlerIntegrationTest.java | 228 ++++++++++++++++
.../sidecar/testing/IntegrationTestModule.java | 17 +-
.../org/apache/cassandra/sidecar/TestModule.java | 11 +
.../sidecar/db/CQLSchemaAccessorTest.java | 107 ++++++++
.../db/DriverUnsupportedSchemaCacheTest.java | 128 +++++++++
.../sidecar/handlers/SchemaHandlerTest.java | 14 +-
.../tasks/CassandraClusterSchemaMonitorTest.java | 10 +-
24 files changed, 1189 insertions(+), 56 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ac93f399..0ac2ac99b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Support column types not parseable by Java 3.x driver (CASSSIDECAR-443)
* CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to
null (CASSSIDECAR-417)
* Fix circle CI pipelines OOM (CASSSIDECAR-423)
* Add JDK11_OPTIONS to the startup script (CASSSIDECAR-416)
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java
new file mode 100644
index 000000000..e759ef67e
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.http.HttpResponseExpectation;
+import org.apache.cassandra.sidecar.common.response.SchemaResponse;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+class CassandraVectorSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ protected static final int MIN_VERSION_WITH_VECTOR = 5;
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace("test_keyspace", Map.of("replication_factor", 1));
+ createTestTable(new QualifiedName("test_keyspace", "int_table"),
+ "CREATE TABLE IF NOT EXISTS %s (a int, b int, PRIMARY
KEY (a))");
+ createTestTable(new QualifiedName("test_keyspace", "vector_table"),
+ "CREATE TABLE IF NOT EXISTS %s (a int, b vector<float,
3>, PRIMARY KEY (a))");
+ }
+
+ @Override
+ protected void beforeClusterProvisioning()
+ {
+ assumeThat(SimpleCassandraVersion.create(testVersion.version()).major)
+ .as("Vector type is supported since Cassandra 5.0")
+ .isGreaterThanOrEqualTo(MIN_VERSION_WITH_VECTOR);
+ }
+
+ @Test
+ void testSchemaHandlerWithVectorTable()
+ {
+ String testRoute = "/api/v1/schema/keyspaces/test_keyspace";
+ SchemaResponse response = getBlocking(trustedClient()
+ .get(serverWrapper.serverPort,
"localhost", testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK))
+ .bodyAsJson(SchemaResponse.class);
+ assertThat(response).isNotNull();
+ assertThat(response.keyspace()).isEqualTo("test_keyspace");
+ assertThat(response.schema()).contains("vector_table");
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java
index 35379035b..ee504fbc2 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java
@@ -117,6 +117,27 @@ public class QualifiedTableName
return table;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (!(o instanceof QualifiedTableName))
+ {
+ return false;
+ }
+ QualifiedTableName that = (QualifiedTableName) o;
+ return Objects.equals(keyspace, that.keyspace) &&
Objects.equals(table, that.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(keyspace, table);
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
index 466bc4286..9c99430a5 100644
---
a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
+++
b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-
import jakarta.inject.Singleton;
import org.jetbrains.annotations.NotNull;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java
index c70a69080..d22767c62 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java
@@ -100,7 +100,7 @@ public class CachedLocalTokenRanges implements
LocalTokenRangesProvider
}
catch (CassandraUnavailableException ignored)
{
- LOGGER.debug("Not yet connect to Cassandra cluster");
+ LOGGER.debug("Not yet connected to Cassandra cluster");
return Collections.emptyMap();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
index 84b60f66c..412a82c1f 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.sidecar.config;
import java.net.InetSocketAddress;
import java.util.List;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
/**
* The driver configuration to use when connecting to Cassandra
*/
@@ -61,4 +63,10 @@ public interface DriverConfiguration
* Cassandra instance.
*/
SslConfiguration sslConfiguration();
+
+ /**
+ * @return Refresh interval of table schemas not supported by Java
driver's metadata
+ * (not parseable, e.g. including vector type).
+ */
+ SecondBoundConfiguration unsupportedTableSchemaRefreshTime();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
index 8755cc00b..b4e1aafc2 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.sidecar.config.yaml;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.config.DriverConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
@@ -32,6 +34,8 @@ import org.apache.cassandra.sidecar.config.SslConfiguration;
public class DriverConfigurationImpl implements DriverConfiguration
{
private static final int DEFAULT_NUM_CONNECTIONS = 1000;
+ private static final SecondBoundConfiguration
DEFAULT_UNSUPPORTED_TABLE_SCHEMA_REFRESH_TIME = new SecondBoundConfiguration(5,
TimeUnit.MINUTES);
+
@JsonProperty("contact_points")
private final List<InetSocketAddress> contactPoints;
@@ -50,9 +54,12 @@ public class DriverConfigurationImpl implements
DriverConfiguration
@JsonProperty("ssl")
private final SslConfiguration sslConfiguration;
+ @JsonProperty("unsupported_table_schema_refresh_time")
+ private final SecondBoundConfiguration unsupportedTableSchemaRefreshTime;
+
public DriverConfigurationImpl()
{
- this(Collections.emptyList(), null, DEFAULT_NUM_CONNECTIONS, null,
null, null);
+ this(Collections.emptyList(), null, DEFAULT_NUM_CONNECTIONS, null,
null, null, DEFAULT_UNSUPPORTED_TABLE_SCHEMA_REFRESH_TIME);
}
public DriverConfigurationImpl(List<InetSocketAddress> contactPoints,
@@ -60,7 +67,8 @@ public class DriverConfigurationImpl implements
DriverConfiguration
int numConnections,
String username,
String password,
- SslConfiguration sslConfiguration)
+ SslConfiguration sslConfiguration,
+ SecondBoundConfiguration
unsupportedTableSchemaRefreshTime)
{
this.contactPoints = contactPoints;
this.localDc = localDc;
@@ -68,6 +76,7 @@ public class DriverConfigurationImpl implements
DriverConfiguration
this.username = username;
this.password = password;
this.sslConfiguration = sslConfiguration;
+ this.unsupportedTableSchemaRefreshTime =
unsupportedTableSchemaRefreshTime;
}
/**
@@ -129,4 +138,14 @@ public class DriverConfigurationImpl implements
DriverConfiguration
{
return sslConfiguration;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @JsonProperty("unsupported_table_schema_refresh_time")
+ public SecondBoundConfiguration unsupportedTableSchemaRefreshTime()
+ {
+ return unsupportedTableSchemaRefreshTime;
+ }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java
new file mode 100644
index 000000000..4b36cbf00
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Component allowing to read CQL schema by executing {@code DESCRIBE}
statement.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class CQLSchemaAccessor
+{
+ private final CQLSessionProvider sessionProvider;
+
+ public CQLSchemaAccessor(CQLSessionProvider sessionProvider)
+ {
+ this.sessionProvider = sessionProvider;
+ }
+
+ @NotNull
+ public Set<Name> getKeyspaces()
+ {
+ Session session = sessionProvider.get();
+ Set<Name> keyspaces = new HashSet<>();
+ List<Row> rows = session.execute("DESCRIBE KEYSPACES").all();
+ for (Row row : rows)
+ {
+ Name keyspaceName = new Name(row.getString("keyspace_name"));
+ keyspaces.add(keyspaceName);
+ }
+ return keyspaces;
+ }
+
+ @Nullable
+ public List<String> getKeyspaceSchema(@NotNull Name keyspace)
+ {
+ Session session = sessionProvider.get();
+ String statement = String.format("DESCRIBE KEYSPACE %s",
keyspace.maybeQuotedName());
+ return describe(session, statement);
+ }
+
+ @Nullable
+ public List<String> getTableSchema(@NotNull Name keyspace, @NotNull Name
table)
+ {
+ Session session = sessionProvider.get();
+ String statement = String.format("DESCRIBE TABLE %s.%s",
keyspace.maybeQuotedName(), table.maybeQuotedName());
+ return describe(session, statement);
+ }
+
+ private List<String> describe(Session session, String describeStatement)
+ {
+ try
+ {
+ List<Row> rows = session.execute(describeStatement).all();
+ List<String> result = new ArrayList<>(rows.size());
+ for (Row row : rows)
+ {
+ String createStatement = row.getString("create_statement");
+ result.add(createStatement);
+ }
+ return result;
+ }
+ catch (InvalidQueryException e)
+ {
+ // keyspace or table not found
+ return null;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java
new file mode 100644
index 000000000..2250147ee
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+ private static final String STATEMENT_DELIMITER = "\n\n";
+
+ private final SidecarConfiguration sidecarConfiguration;
+ private final CQLSessionProvider sessionProvider;
+ private final CQLSchemaAccessor schemaAccessor;
+
+ // cache contains only schemas unparseable by Java driver
+ // during every cache refresh, reference is being reassigned to achieve
atomic swap
+ private volatile SortedMap<QualifiedTableName, String> schemaCache;
+ private volatile boolean initialized; // flag indicating whether cache has
been populated at least once
+
+ private PreparedStatement tableListStatement;
+
+ public DriverUnsupportedSchemaCache(SidecarConfiguration
sidecarConfiguration,
+ CQLSessionProvider sessionProvider)
+ {
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.sessionProvider = sessionProvider;
+ this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+ this.schemaCache = createCache();
+ this.initialized = false;
+ }
+
+ /**
+ * @return Schema for all tables across all keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getFullSchema()
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table -> true);
+ }
+
+ /**
+ * @return Schema for all tables within given keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getKeyspaceSchema(@NotNull Name keyspace)
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table ->
keyspace.equals(table.getKeyspace()));
+ }
+
+ /**
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+ {
+ return getTableSchema(keyspace, table, true);
+ }
+
+ /**
+ * @param allowRefresh Flag indicating whether lookup of table schema via
CQL query
+ * is allowed, when data not found in cache.
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table,
boolean allowRefresh)
+ {
+ refreshIfUninitialized();
+ QualifiedTableName name = new QualifiedTableName(keyspace, table);
+ String schema = schemaCache.get(name);
+ if (schema == null && allowRefresh)
+ {
+ // proactively fetch table schema, not to provide
+ // false-negative when table is not cached yet
+ // attention, method can block
+ schema = populateSchemaCache(schemaCache, name);
+ }
+ return schema;
+ }
+
+ @Override
+ public DurationSpec delay()
+ {
+ return
sidecarConfiguration.driverConfiguration().unsupportedTableSchemaRefreshTime();
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ try
+ {
+ refresh(false);
+ promise.tryComplete();
+ }
+ catch (Throwable t)
+ {
+ promise.fail(t);
+ }
+ }
+
+ private String getUnsupportedSchema(Predicate<QualifiedTableName>
condition)
+ {
+ StringBuilder result = new StringBuilder();
+ for (Map.Entry<QualifiedTableName, String> entry :
schemaCache.entrySet())
+ {
+ if (condition.test(entry.getKey()))
+ {
+ result.append(entry.getValue()).append(STATEMENT_DELIMITER);
+ }
+ }
+ return result.toString().trim();
+ }
+
+ private void refreshIfUninitialized()
+ {
+ if (!initialized)
+ {
+ refresh(true);
+ }
+ }
+
+ public synchronized void refresh(boolean initializeOnly)
+ {
+ if (initialized && initializeOnly)
+ {
+ // cache has been already initialized, early exit
+ return;
+ }
+ try
+ {
+ Session session = sessionProvider.get();
+ prepareStatements(session);
+
+ Set<QualifiedTableName> tables = queryAllTables(session);
+ Set<QualifiedTableName> driverKnownTables =
driverKnownTables(session);
+
+ tables.removeAll(driverKnownTables);
+
+ SortedMap<QualifiedTableName, String> newCache = createCache();
+ if (!tables.isEmpty())
+ {
+ LOGGER.debug("Tables not supported by Java driver metadata:
{}", tables);
+ tables.forEach(table -> populateSchemaCache(newCache, table));
+ }
+ // replacing cache, because some tables might have been removed in
the meanwhile
+ schemaCache = newCache;
+
+ initialized = true;
+ }
+ catch (CassandraUnavailableException ignored)
+ {
+ LOGGER.debug("Not yet connected to Cassandra cluster");
+ }
+ }
+
+ private String populateSchemaCache(Map<QualifiedTableName, String> cache,
QualifiedTableName table)
+ {
+ Name keyspaceName = table.getKeyspace();
+ Name tableName = table.table();
+ if (keyspaceName == null || tableName == null)
+ {
+ throw new IllegalArgumentException("Invalid table name: " + table);
+ }
+ List<String> cqlSchema = schemaAccessor.getTableSchema(keyspaceName,
tableName);
+ if (cqlSchema != null)
+ {
+ String schema = String.join(STATEMENT_DELIMITER, cqlSchema);
+ cache.put(table, schema);
+ return schema;
+ }
+ return null;
+ }
+
+ private Set<QualifiedTableName> queryAllTables(Session session)
+ {
+ List<Row> rows = session.execute(tableListStatement.bind()).all();
+ return rows.stream()
+ .map(r -> new
QualifiedTableName(r.getString("keyspace_name"),
+ r.getString("table_name")))
+ .collect(Collectors.toSet());
+ }
+
+ private Set<QualifiedTableName> driverKnownTables(Session session)
+ {
+ Set<QualifiedTableName> result = new HashSet<>();
+ Cluster cluster = session.getCluster();
+ for (KeyspaceMetadata keyspace : cluster.getMetadata().getKeyspaces())
+ {
+ for (TableMetadata table : keyspace.getTables())
+ {
+ result.add(new QualifiedTableName(keyspace.getName(),
table.getName()));
+ }
+ }
+ return result;
+ }
+
+ private void prepareStatements(Session session)
+ {
+ if (tableListStatement == null)
+ {
+ tableListStatement = session.prepare("SELECT keyspace_name,
table_name FROM system_schema.tables");
+ }
+ }
+
+ private SortedMap<QualifiedTableName, String> createCache()
+ {
+ // use sorted map for repeatable results when retrieving full schema
+ // synchronized, because different threads may indirectly add items to
the map using getTableSchema() method
+ return Collections.synchronizedSortedMap(new
TreeMap<>(Comparator.comparing(QualifiedTableName::toString)));
+ }
+
+ @VisibleForTesting
+ void setInitialized(boolean initialized)
+ {
+ this.initialized = initialized;
+ }
+
+ public static String concatSchemas(String ... schemas)
+ {
+ StringBuilder result = new StringBuilder();
+ for (String schema : schemas)
+ {
+ if (StringUtils.isNotEmpty(schema))
+ {
+ result.append(schema.trim()).append(STATEMENT_DELIMITER);
+ }
+ }
+ return result.toString().trim();
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java
index c9446fe69..3a43976bf 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java
@@ -25,6 +25,7 @@ import com.datastax.driver.core.Metadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
@@ -34,6 +35,7 @@ import
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.common.response.SchemaResponse;
import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.MetadataUtils;
@@ -47,19 +49,24 @@ import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio
@Singleton
public class KeyspaceSchemaHandler extends AbstractHandler<Name> implements
AccessProtected
{
+ private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache;
+
/**
* Constructs a handler with the provided {@code metadataFetcher}
*
* @param metadataFetcher the interface to retrieve metadata
* @param executorPools executor pools for blocking executions
* @param validator a validator instance to validate
Cassandra-specific input
+ * @param driverUnsupportedSchemaCache cache of unparseable table schemas
by Java driver
*/
@Inject
protected KeyspaceSchemaHandler(InstanceMetadataFetcher metadataFetcher,
ExecutorPools executorPools,
- CassandraInputValidator validator)
+ CassandraInputValidator validator,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache)
{
super(metadataFetcher, executorPools, validator);
+ this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache;
}
@Override
@@ -78,9 +85,9 @@ public class KeyspaceSchemaHandler extends
AbstractHandler<Name> implements Acce
SocketAddress remoteAddress,
Name keyspace)
{
- metadata(host)
- .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, keyspace))
- .onSuccess(metadata -> handleWithMetadata(context, keyspace,
metadata));
+ Future.all(metadata(host), unsupportedSchema(keyspace))
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, keyspace))
+ .onSuccess(future -> handleWithMetadata(context, keyspace,
future));
}
/**
@@ -88,13 +95,19 @@ public class KeyspaceSchemaHandler extends
AbstractHandler<Name> implements Acce
*
* @param context the event to handle
* @param keyspace the keyspace parsed from the request
- * @param metadata the metadata on the connected cluster, including known
nodes and schema definitions
+ * @param future composite future containing result of:
+ * - the metadata on the connected cluster, including
known nodes and schema definitions
+ * - additional schema which could not be parsed by Java
driver
*/
- private void handleWithMetadata(RoutingContext context, Name keyspace,
Metadata metadata)
+ private void handleWithMetadata(RoutingContext context, Name keyspace,
CompositeFuture future)
{
+ Metadata metadata = future.resultAt(0);
+ String unparseableSchema = future.resultAt(1);
if (keyspace == null)
{
- SchemaResponse schemaResponse = new
SchemaResponse(metadata.exportSchemaAsString());
+ String fullSchema =
DriverUnsupportedSchemaCache.concatSchemas(metadata.exportSchemaAsString(),
+
unparseableSchema);
+ SchemaResponse schemaResponse = new SchemaResponse(fullSchema);
context.json(schemaResponse);
return;
}
@@ -111,8 +124,9 @@ public class KeyspaceSchemaHandler extends
AbstractHandler<Name> implements Acce
return;
}
- SchemaResponse schemaResponse = new SchemaResponse(keyspace.name(),
-
ksMetadata.exportAsString());
+ String keyspaceSchema =
DriverUnsupportedSchemaCache.concatSchemas(ksMetadata.exportAsString(),
+
unparseableSchema);
+ SchemaResponse schemaResponse = new SchemaResponse(keyspace.name(),
keyspaceSchema);
context.json(schemaResponse);
}
@@ -130,6 +144,27 @@ public class KeyspaceSchemaHandler extends
AbstractHandler<Name> implements Acce
});
}
+ /**
+ * Get CQL schema not parseable by Java driver (and therefore not present
in {@link Metadata}).
+ *
+ * @param keyspace optional keyspace name, {@code null} includes schema
for all keyspaces
+ * @return {@link Future} containing CQL schema
+ */
+ private Future<String> unsupportedSchema(Name keyspace)
+ {
+ return executorPools.service().executeBlocking(() -> {
+ // schema cache can block if it was not successfully initialized
at least once
+ if (keyspace == null)
+ {
+ return driverUnsupportedSchemaCache.getFullSchema();
+ }
+ else
+ {
+ return
driverUnsupportedSchemaCache.getKeyspaceSchema(keyspace);
+ }
+ });
+ }
+
/**
* Parses the request parameters
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java
index ec3f65fa3..351115b80 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java
@@ -27,6 +27,7 @@ import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -42,13 +43,15 @@ public class SchemaHandler extends KeyspaceSchemaHandler
* @param metadataFetcher the interface to retrieve metadata
* @param executorPools executor pools for blocking executions
* @param validator a validator instance to validate
Cassandra-specific input
+ * @param driverUnsupportedSchemaCache cache of unparseable table schemas
by Java driver
*/
@Inject
protected SchemaHandler(InstanceMetadataFetcher metadataFetcher,
ExecutorPools executorPools,
- CassandraInputValidator validator)
+ CassandraInputValidator validator,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache)
{
- super(metadataFetcher, executorPools, validator);
+ super(metadataFetcher, executorPools, validator,
driverUnsupportedSchemaCache);
}
@Override
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java
index d6c81e456..345eea72b 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -40,6 +41,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.handlers.AbstractHandler;
import org.apache.cassandra.sidecar.handlers.AccessProtected;
import org.apache.cassandra.sidecar.handlers.data.SSTableUploadRequestParam;
@@ -70,18 +72,20 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequestPa
private final SSTableUploadsPathBuilder uploadPathBuilder;
private final ConcurrencyLimiter limiter;
private final DigestVerifierFactory digestVerifierFactory;
+ private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache;
/**
* Constructs a handler with the provided params.
*
- * @param vertx the vertx instance
- * @param serviceConfiguration configuration object holding config
details of Sidecar
- * @param metadataFetcher the interface to retrieve metadata
- * @param uploader a class that uploads the components
- * @param uploadPathBuilder a class that provides SSTableUploads
directories
- * @param executorPools executor pools for blocking executions
- * @param validator a validator instance to validate
Cassandra-specific input
- * @param digestVerifierFactory a factory of checksum verifiers
+ * @param vertx the vertx instance
+ * @param serviceConfiguration configuration object holding config
details of Sidecar
+ * @param metadataFetcher the interface to retrieve metadata
+ * @param uploader a class that uploads the components
+ * @param uploadPathBuilder a class that provides
SSTableUploads directories
+ * @param executorPools executor pools for blocking
executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ * @param digestVerifierFactory a factory of checksum verifiers
+ * @param driverUnsupportedSchemaCache cache of unparseable table schemas
by Java driver
*/
@Inject
protected SSTableUploadHandler(Vertx vertx,
@@ -91,7 +95,8 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequestPa
SSTableUploadsPathBuilder uploadPathBuilder,
ExecutorPools executorPools,
CassandraInputValidator validator,
- DigestVerifierFactory digestVerifierFactory)
+ DigestVerifierFactory digestVerifierFactory,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache)
{
super(metadataFetcher, executorPools, validator);
this.fs = vertx.fileSystem();
@@ -100,6 +105,7 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequestPa
this.uploadPathBuilder = uploadPathBuilder;
this.limiter = new
ConcurrencyLimiter(configuration::concurrentUploadsLimit);
this.digestVerifierFactory = digestVerifierFactory;
+ this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache;
}
@Override
@@ -200,25 +206,28 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequestPa
SSTableUploadRequestParam request)
{
TaskExecutorPool pool = executorPools.service();
- return pool.executeBlocking(() ->
metadataFetcher.delegate(host).metadata())
- .compose(metadata -> {
- KeyspaceMetadata keyspaceMetadata =
MetadataUtils.keyspace(metadata, request.keyspace());
- if (keyspaceMetadata == null)
- {
- String message = String.format("Invalid keyspace
'%s' supplied", request.keyspace());
- logger.error(message);
- return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
- }
+ return Future.all(pool.executeBlocking(() ->
metadataFetcher.delegate(host).metadata()),
+ pool.executeBlocking(() ->
driverUnsupportedSchemaCache.getTableSchema(request.keyspace(),
request.table(), false)))
+ .compose(compositeFuture -> {
+ Metadata metadata = compositeFuture.resultAt(0);
+ String unparseableSchema =
compositeFuture.resultAt(1);
- if (MetadataUtils.table(keyspaceMetadata,
request.table()) == null)
- {
- String message = String.format("Invalid table name
'%s' supplied for keyspace '%s'",
- request.table(),
request.keyspace());
- logger.error(message);
- return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
- }
- return Future.succeededFuture(request);
- });
+ KeyspaceMetadata keyspaceMetadata =
MetadataUtils.keyspace(metadata, request.keyspace());
+ if (keyspaceMetadata == null)
+ {
+ String message = String.format("Invalid keyspace
'%s' supplied", request.keyspace());
+ logger.error(message);
+ return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
+ }
+ if (MetadataUtils.table(keyspaceMetadata,
request.table()) == null && unparseableSchema == null)
+ {
+ String message = String.format("Invalid table
name '%s' supplied for keyspace '%s'",
+ request.table(),
request.keyspace());
+ logger.error(message);
+ return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
+ }
+ return Future.succeededFuture(request);
+ });
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java
index 24a3ec9db..136f2770f 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java
@@ -27,8 +27,10 @@ import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.handlers.AbstractHandler;
import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
@@ -46,12 +48,16 @@ import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio
@Singleton
public class ValidateTableExistenceHandler extends
AbstractHandler<QualifiedTableName>
{
+ private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache;
+
@Inject
public ValidateTableExistenceHandler(InstanceMetadataFetcher
metadataFetcher,
ExecutorPools executorPools,
- CassandraInputValidator validator)
+ CassandraInputValidator validator,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache)
{
super(metadataFetcher, executorPools, validator);
+ this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache;
}
// It is a validator, and it does not assume values (keyspace and table)
are present.
@@ -76,9 +82,13 @@ public class ValidateTableExistenceHandler extends
AbstractHandler<QualifiedTabl
return;
}
- getKeyspaceMetadata(host, input.maybeQuotedKeyspace())
+ Future.all(getKeyspaceMetadata(host, input.maybeQuotedKeyspace()),
+ unsupportedSchema(input.maybeQuotedKeyspace(),
input.maybeQuotedTableName()))
.onFailure(context::fail) // fail the request with the internal server
error thrown from getKeyspaceMetadata
- .onSuccess(keyspaceMetadata -> {
+ .onSuccess(compositeFuture -> {
+ KeyspaceMetadata keyspaceMetadata = compositeFuture.resultAt(0);
+ String unparseableSchema = compositeFuture.resultAt(1);
+
if (keyspaceMetadata == null)
{
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
@@ -97,15 +107,20 @@ public class ValidateTableExistenceHandler extends
AbstractHandler<QualifiedTabl
}
TableMetadata tableMetadata = keyspaceMetadata.getTable(table);
- if (tableMetadata == null)
+ boolean tableExists = tableMetadata != null || unparseableSchema
!= null;
+ if (!tableExists)
{
String errMsg = "Table " + input.tableName() + " was not found
for keyspace " + input.keyspace();
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
errMsg));
}
else
{
- RoutingContextUtils.put(context,
RoutingContextUtils.SC_TABLE_METADATA, tableMetadata);
// keyspace / [table] exists
+ if (tableMetadata != null)
+ {
+ // SC_TABLE_METADATA will not be present for unparseable
tables
+ RoutingContextUtils.put(context,
RoutingContextUtils.SC_TABLE_METADATA, tableMetadata);
+ }
context.next();
}
});
@@ -118,4 +133,18 @@ public class ValidateTableExistenceHandler extends
AbstractHandler<QualifiedTabl
.metadata()
.getKeyspace(keyspace));
}
+
+ private Future<String> unsupportedSchema(String keyspace, String table)
+ {
+ return executorPools.service().executeBlocking(() -> {
+ if (table == null)
+ {
+ return null;
+ }
+ // schema cache can block if it was not successfully initialized
at least once
+ return driverUnsupportedSchemaCache.getTableSchema(new
Name(keyspace),
+ new Name(table),
+ false);
+ });
+ }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index ddf094f9c..fec382a4b 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -70,6 +70,7 @@ import
org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
import org.apache.cassandra.sidecar.db.CdcConfigAccessor;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.db.KafkaConfigAccessor;
import org.apache.cassandra.sidecar.db.TokenSplitConfigAccessor;
import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
@@ -128,10 +129,11 @@ public class CdcModule extends AbstractModule
@Singleton
CassandraClusterSchemaMonitor
cassandraClusterSchemaMonitorInstance(InstanceMetadataFetcher
instanceMetadataFetcher,
CdcDatabaseAccessor databaseAccessor,
+
DriverUnsupportedSchemaCache driverUnsupportedSchemaCache,
SidecarConfiguration configuration,
CassandraBridgeFactory cassandraBridgeFactory)
{
- return new CassandraClusterSchemaMonitor(instanceMetadataFetcher,
databaseAccessor, configuration, cassandraBridgeFactory);
+ return new CassandraClusterSchemaMonitor(instanceMetadataFetcher,
databaseAccessor, driverUnsupportedSchemaCache, configuration,
cassandraBridgeFactory);
}
@ProvidesIntoMap
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
index 785104b6a..983a6195d 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
@@ -32,6 +32,7 @@ import com.datastax.driver.core.NettyOptions;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoMap;
import com.google.inject.name.Named;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
@@ -54,12 +55,18 @@ import org.apache.cassandra.sidecar.config.JmxConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
+import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
+import org.apache.cassandra.sidecar.modules.multibindings.PeriodicTaskMapKeys;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
+import org.apache.cassandra.sidecar.utils.EventBusUtils;
import static
org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
/**
@@ -126,6 +133,30 @@ public class ConfigurationModule extends AbstractModule
return cqlSessionProvider;
}
+ @Provides
+ @Singleton
+ DriverUnsupportedSchemaCache driverUnsupportedSchemaCache(Vertx vertx,
+
SidecarConfiguration sidecarConfiguration,
+
CQLSessionProvider cqlSessionProvider)
+ {
+ DriverUnsupportedSchemaCache schemaCache = new
DriverUnsupportedSchemaCache(sidecarConfiguration, cqlSessionProvider);
+ // trigger immediate cache population once Sidecar connects to
Cassandra node
+ EventBusUtils.onceLocalConsumer(vertx.eventBus(),
+ ON_CASSANDRA_CQL_READY.address(),
+ ignored -> vertx.executeBlocking(() ->
{
+ schemaCache.refresh(true);
+ return null;
+ }));
+ return schemaCache;
+ }
+
+ @ProvidesIntoMap
+ @KeyClassMapKey(PeriodicTaskMapKeys.UnsupportedSchemaCacheTaskKey.class)
+ PeriodicTask
driverUnsupportedSchemaCachePeriodicTask(DriverUnsupportedSchemaCache
schemaCache)
+ {
+ return schemaCache;
+ }
+
@Provides
@Singleton
CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver,
DriverUtils driverUtils, TableSchemaFetcher tableSchemaFetcher)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
index 293a57a22..fcf7055fa 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
@@ -37,4 +37,5 @@ public interface PeriodicTaskMapKeys
interface CdcPublisherTaskKey extends ClassKey {}
interface CdcConfigRefresherNotifierKey extends ClassKey {}
interface CassandraClusterSchemaTaskKey extends ClassKey {}
+ interface UnsupportedSchemaCacheTaskKey extends ClassKey {}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
index 4d9df6c5d..79049de51 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
@@ -33,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.inject.Singleton;
import io.vertx.core.Promise;
import org.apache.cassandra.bridge.CassandraBridge;
@@ -44,6 +43,7 @@ import
org.apache.cassandra.sidecar.common.response.NodeSettings;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.utils.CdcUtil;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.spark.data.CqlTable;
@@ -69,6 +69,7 @@ public class CassandraClusterSchemaMonitor implements
PeriodicTask
private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
private final CdcDatabaseAccessor databaseAccessor;
+ private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache;
private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
private final SidecarConfiguration sidecarConfiguration;
private final InstanceMetadataFetcher instanceFetcher;
@@ -76,12 +77,14 @@ public class CassandraClusterSchemaMonitor implements
PeriodicTask
public CassandraClusterSchemaMonitor(InstanceMetadataFetcher
instanceFetcher,
CdcDatabaseAccessor databaseAccessor,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache,
SidecarConfiguration
sidecarConfiguration,
CassandraBridgeFactory
cassandraBridgeFactory)
{
this.instanceFetcher = instanceFetcher;
this.databaseAccessor = databaseAccessor;
+ this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache;
this.sidecarConfiguration = sidecarConfiguration;
this.cassandraBridgeFactory = cassandraBridgeFactory;
}
@@ -93,14 +96,15 @@ public class CassandraClusterSchemaMonitor implements
PeriodicTask
public void refresh()
{
- NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance ->
instance.delegate().nodeSettings());
CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
try
{
LOGGER.debug("Checking for schema changes...");
- String fullSchemaText = databaseAccessor.fullSchema();
+ String fullSchemaText =
DriverUnsupportedSchemaCache.concatSchemas(databaseAccessor.fullSchema(),
+
driverUnsupportedSchemaCache.getFullSchema());
if (!fullSchemaText.equals(currSchemaText.get()))
{
LOGGER.info("Schema change detected, refreshing CDC tables");
@@ -177,10 +181,13 @@ public class CassandraClusterSchemaMonitor implements
PeriodicTask
@VisibleForTesting
static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor
cdcDatabaseAccessor,
+ DriverUnsupportedSchemaCache
driverUnsupportedSchemaCache,
ConcurrentHashMap<TableIdentifier,
UUID> tableIdCache,
@NotNull final CassandraBridge
cassandraBridge)
{
- return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+ String fullSchema =
DriverUnsupportedSchemaCache.concatSchemas(cdcDatabaseAccessor.fullSchema(),
+
driverUnsupportedSchemaCache.getFullSchema());
+ return buildCdcTables(fullSchema,
cdcDatabaseAccessor.partitioner(),
tableIdCache,
cdcDatabaseAccessor::getTableId,
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableVectorUploadHandlerIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableVectorUploadHandlerIntegrationTest.java
new file mode 100644
index 000000000..2f72d7c11
--- /dev/null
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableVectorUploadHandlerIntegrationTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableImportHandler;
+import
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableUploadHandler;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.IClusterExtension;
+import org.apache.cassandra.testing.utils.AssertionUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for {@link SSTableUploadHandler}, {@link
SSTableImportHandler} and vector type
+ * TODO: Consider removing after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@ExtendWith(VertxExtension.class)
+public class SSTableVectorUploadHandlerIntegrationTest extends
IntegrationTestBase
+{
+ public static final SimpleCassandraVersion MIN_VERSION_WITH_VECTOR =
SimpleCassandraVersion.create("5.0.0");
+
+ @CassandraIntegrationTest
+ void testSSTableImport(VertxTestContext vertxTestContext)
+ throws Exception
+ {
+ assumeThat(sidecarTestContext.version)
+ .as("Vector type is supported since Cassandra 5.0")
+ .isGreaterThanOrEqualTo(MIN_VERSION_WITH_VECTOR);
+
+ // Create a table. Insert some data, create a snapshot that we'll use
for import.
+ // Move snapshot files to a temporary destination.
+ // Truncate the table and insert more data.
+ // Upload the SSTables.
+ // Test the import SSTable endpoint by importing data that was
originally truncated.
+ // Verify by querying the table contains all the results before
truncation and after truncation.
+
+ createTestKeyspace();
+ Session session = maybeGetSession();
+ QualifiedTableName tableName =
createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
+
+ // wait for schema to be present
+ // periodic refresh of Java driver unsupported schemas is configured
to 3 seconds
+ AssertionUtils.loopAssert(15, 1000, () -> {
+ String testSchemaRoute = "/api/v1/cassandra/schema";
+ HttpResponse<Buffer> response = client.get(server.actualPort(),
"127.0.0.1", testSchemaRoute).send()
+
.toCompletionStage().toCompletableFuture().join();
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+
assertThat(response.bodyAsString()).contains(tableName.tableName());
+ });
+
+ // create a snapshot called <tableName>-snapshot for tbl1
+ IClusterExtension<? extends IInstance> cluster =
sidecarTestContext.cluster();
+ String snapshotStdout = cluster.get(1).nodetoolResult("snapshot",
+ "--tag",
tableName.tableName() + "-snapshot",
+ "--table",
tableName.tableName(),
+ "--",
tableName.keyspace()).getStdout();
+ assertThat(snapshotStdout).contains("Snapshot directory: " +
tableName.tableName() + "-snapshot");
+ // find the directory in the filesystem
+ final List<Path> snapshotFiles = findChildFile(sidecarTestContext,
"127.0.0.1",
+ tableName.keyspace(),
tableName.tableName() + "-snapshot");
+
+ assertThat(snapshotFiles).isNotEmpty();
+
+ // copy snapshot files into the temporary directory
+ File tmpDir = new File("/tmp", UUID.randomUUID().toString());
+ boolean mkdirs = tmpDir.mkdirs();
+ assertThat(mkdirs)
+ .withFailMessage("Could not create directory " + tmpDir)
+ .isTrue();
+ for (Path path : snapshotFiles)
+ {
+ if (path.toFile().isFile())
+ {
+ Files.copy(path, tmpDir.toPath().resolve(path.getFileName()));
+ }
+ }
+
+ // Now truncate the contents of the table
+ truncateAndVerify(tableName);
+
+ // Add new data (c, d) to table
+ populateTable(session, tableName, Arrays.asList("c", "d"));
+
+ WebClient client = mTLSClient();
+
+ // Upload sstables form the snapshot
+ final UUID uploadId = UUID.randomUUID();
+ String testUploadRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/"
+ tableName.keyspace()
+ + "/tables/" + tableName.tableName() +
"/components/";
+ for (Path path : snapshotFiles)
+ {
+ if (path.toFile().isFile())
+ {
+ Path sstableComponent =
tmpDir.toPath().resolve(path.getFileName());
+ client.put(server.actualPort(), "127.0.0.1", testUploadRoute +
path.getFileName())
+
.sendStream(vertx.fileSystem().openBlocking(sstableComponent.toFile().getAbsolutePath(),
new OpenOptions().setRead(true)),
+ vertxTestContext.succeeding(response ->
vertxTestContext.verify(() -> {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ vertxTestContext.completeNow();
+ })));
+ }
+ }
+ assertThat(vertxTestContext.awaitCompletion(30,
TimeUnit.SECONDS)).isTrue();
+
+ String testImportRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/"
+ tableName.keyspace()
+ + "/tables/" + tableName.tableName() +
"/import";
+ sendRequest(vertxTestContext,
+ () -> client.put(server.actualPort(), "127.0.0.1",
testImportRoute),
+ vertxTestContext.succeeding(response ->
vertxTestContext.verify(() -> {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ assertThat(queryValues(tableName))
+ .containsAll(Arrays.asList("a", "b", "c", "d"));
+ vertxTestContext.completeNow();
+ })));
+ // wait until test completes
+ assertThat(vertxTestContext.awaitCompletion(30,
TimeUnit.SECONDS)).isTrue();
+ }
+
+ private void sendRequest(VertxTestContext vertxTestContext,
Supplier<HttpRequest<Buffer>> requestSupplier,
+ Handler<AsyncResult<HttpResponse<Buffer>>>
handler)
+ {
+ requestSupplier.get()
+ .send(vertxTestContext.succeeding(r ->
vertxTestContext.verify(() -> {
+ int statusCode = r.statusCode();
+ if (statusCode ==
HttpResponseStatus.ACCEPTED.code())
+ {
+ // retry the request every second when the
request is accepted
+ vertx.setTimer(1000, tid ->
sendRequest(vertxTestContext, requestSupplier, handler));
+ }
+ else
+ {
+ handler.handle(Future.succeededFuture(r));
+ }
+ })));
+ }
+
+ private void truncateAndVerify(QualifiedTableName qualifiedTableName)
+ throws InterruptedException
+ {
+ Session session = maybeGetSession();
+ session.execute("TRUNCATE TABLE " + qualifiedTableName);
+
+ while (true)
+ {
+ TimeUnit.MILLISECONDS.sleep(100);
+ // use count, because Java driver may fail to deserialize unknown
types
+ long count = session.execute("SELECT COUNT(*) FROM " +
qualifiedTableName).one().getLong(0);
+ if (count == 0)
+ break; // truncate succeeded
+ }
+ }
+
+ private List<String> queryValues(QualifiedTableName tableName)
+ {
+ Session session = maybeGetSession();
+ return session.execute("SELECT id FROM " + tableName)
+ .all()
+ .stream()
+ .map(row -> row.getString("id"))
+ .collect(Collectors.toList());
+ }
+
+ protected QualifiedTableName
createTestTableAndPopulate(CassandraSidecarTestContext cassandraTestContext,
+ List<String>
values)
+ {
+ QualifiedTableName tableName = createTestTable(
+ "CREATE TABLE IF NOT EXISTS %s (id text, value vector<float, 3>,
PRIMARY KEY(id))" + WITH_COMPACTION_DISABLED + ";");
+ Session session = maybeGetSession();
+ populateTable(session, tableName, values);
+ return tableName;
+ }
+
+ private void populateTable(Session session, QualifiedTableName tableName,
List<String> values)
+ {
+ float index = 1;
+ for (String value : values)
+ {
+ session.execute(String.format("INSERT INTO %s (id, value) VALUES
('%s', [%f, %f, %f]);", tableName, value, index, index, index));
+ index++;
+ }
+ }
+}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index 7d985e2d4..918fdcdd3 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.Session;
import com.google.inject.AbstractModule;
@@ -39,6 +40,7 @@ import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfigur
import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
@@ -48,6 +50,7 @@ import
org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.CacheConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.ClusterLeaseClaimConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.DriverConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
@@ -60,6 +63,7 @@ import org.apache.cassandra.sidecar.coordination.ClusterLease;
import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
import
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.modules.multibindings.ClassKey;
@@ -70,6 +74,7 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
import org.jetbrains.annotations.NotNull;
+import static
org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
/**
@@ -121,6 +126,10 @@ public class IntegrationTestModule extends AbstractModule
MillisecondBoundConfiguration.parse("50ms"),
MillisecondBoundConfiguration.parse("5000ms"));
+ DriverConfiguration driverConfiguration = new
DriverConfigurationImpl(Collections.emptyList(), "dc1",
+
1, "cassandra", "cassandra",
+
null, new SecondBoundConfiguration(3, TimeUnit.SECONDS));
+
SslConfiguration sslConfiguration =
SslConfigurationImpl.builder()
.enabled(true)
@@ -138,6 +147,7 @@ public class IntegrationTestModule extends AbstractModule
.accessControlConfiguration(accessControlConfiguration)
.serviceConfiguration(conf)
.healthCheckConfiguration(healthCheckConfiguration)
+
.driverConfiguration(driverConfiguration)
.build();
}
@@ -227,7 +237,12 @@ public class IntegrationTestModule extends AbstractModule
@NotNull
public Session get()
{
- return cassandraSidecarTestContext.session();
+ Session session = cassandraSidecarTestContext.session();
+ if (session == null)
+ {
+ throw new CassandraUnavailableException(CQL, "CQL session
unavailable");
+ }
+ return session;
}
@Override
diff --git a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 87c33d606..0494bbe92 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -64,6 +64,7 @@ import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
import org.apache.cassandra.sidecar.config.yaml.ThrottleConfigurationImpl;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
@@ -253,4 +254,14 @@ public class TestModule extends AbstractModule
{
return CassandraClientTokenRingProviderTest.mockDnsResolver();
}
+
+ @Provides
+ @Singleton
+ public DriverUnsupportedSchemaCache driverUnsupportedSchemaCache()
+ {
+ DriverUnsupportedSchemaCache mock =
mock(DriverUnsupportedSchemaCache.class);
+ when(mock.getFullSchema()).thenReturn("");
+ when(mock.getKeyspaceSchema(any())).thenReturn("");
+ return mock;
+ }
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java
new file mode 100644
index 000000000..e9c6ad626
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.startsWith;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CQLSchemaAccessorTest
+{
+ static final Name KEYSPACE = new Name("keyspace1");
+ static final Name TABLE = new Name("table1");
+ static final String SCHEMA = "CREATE TABLE keyspace1.table1 (a int, b int,
PRIMARY KEY (a));";
+
+ CQLSchemaAccessor schemaAccessor;
+
+ @BeforeEach
+ void setUp()
+ {
+ CQLSessionProvider sessionProvider =
mockCQLSessionProvider(KEYSPACE.name(), TABLE.name(), SCHEMA);
+ schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+ }
+
+ @Test
+ void testGetKeyspaceNames()
+ {
+ assertThat(schemaAccessor.getKeyspaces()).containsExactly(KEYSPACE);
+ }
+
+ @Test
+ void testGetExistingTable()
+ {
+ assertThat(schemaAccessor.getTableSchema(KEYSPACE,
TABLE)).containsExactly(SCHEMA);
+ }
+
+ @Test
+ void testGetNotExistingTable()
+ {
+ assertThat(schemaAccessor.getTableSchema(KEYSPACE, new
Name("unknown"))).isNull();
+ }
+
+ static CQLSessionProvider mockCQLSessionProvider(String keyspace, String
table, String schema)
+ {
+ Session session = mock(Session.class, RETURNS_DEEP_STUBS);
+
+ when(session.execute(eq("DESCRIBE KEYSPACES"))).then(invocation -> {
+ ResultSet resultSet = mock(ResultSet.class);
+ Row row = mockRow(Map.of("keyspace_name", keyspace));
+ when(resultSet.all()).thenReturn(List.of(row));
+ return resultSet;
+ });
+
+ when(session.execute(startsWith("DESCRIBE TABLE"))).thenThrow(new
InvalidQueryException("Unknown table"));
+
+ String describeTable = String.format("DESCRIBE TABLE %s.%s", keyspace,
table);
+ when(session.execute(eq(describeTable))).then(invocation -> {
+ ResultSet resultSet = mock(ResultSet.class);
+ Row row = mockRow(Map.of("create_statement", schema));
+ when(resultSet.all()).thenReturn(List.of(row));
+ return resultSet;
+ });
+
+ CQLSessionProvider cqlSession = mock(CQLSessionProvider.class);
+ when(cqlSession.get()).thenReturn(session);
+
+ return cqlSession;
+ }
+
+ static Row mockRow(Map<String, String> values)
+ {
+ Row row = mock(Row.class);
+ values.forEach((k, v) -> when(row.getString(k)).thenReturn(v));
+ return row;
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java
new file mode 100644
index 000000000..0c9de1ebe
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+
+import static org.apache.cassandra.sidecar.db.CQLSchemaAccessorTest.mockRow;
+import static
org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class DriverUnsupportedSchemaCacheTest
+{
+ static final Name KEYSPACE = new Name("keyspace1");
+ static final Name TABLE = new Name("table1");
+ static final String SCHEMA = "CREATE TABLE keyspace1.table1 (a int, b
vector<float, 3>, PRIMARY KEY (a));";
+
+ CQLSessionProvider sessionProvider;
+ Session session;
+ DriverUnsupportedSchemaCache schemaCache;
+
+ @BeforeEach
+ void setUp()
+ {
+ sessionProvider =
CQLSchemaAccessorTest.mockCQLSessionProvider(KEYSPACE.name(), TABLE.name(),
SCHEMA);
+ session = sessionProvider.get();
+ mockPreparedQuery(session,
+ "SELECT keyspace_name, table_name FROM
system_schema.tables",
+ List.of(Map.of("keyspace_name", KEYSPACE.name(),
"table_name", TABLE.name())));
+ SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration();
+ schemaCache = new DriverUnsupportedSchemaCache(sidecarConfiguration,
sessionProvider);
+ }
+
+ @Test
+ void testImmediateSchemaLookup()
+ {
+ assertThat(schemaCache.getFullSchema()).isEqualTo(SCHEMA);
+ assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo(SCHEMA);
+ assertThat(schemaCache.getTableSchema(KEYSPACE,
TABLE)).isEqualTo(SCHEMA);
+
+ assertThat(schemaCache.getTableSchema(new Name("unknown"),
TABLE)).isNull();
+ assertThat(schemaCache.getTableSchema(KEYSPACE, new
Name("unknown"))).isNull();
+ }
+
+ @Test
+ void testSchemaLookupAfterRefresh()
+ {
+ // mark cache as initialized, but effectively empty
+ schemaCache.setInitialized(true);
+
+ assertThat(schemaCache.getFullSchema()).isEqualTo("");
+ assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo("");
+
+ Promise<Void> p = Promise.promise();
+ schemaCache.execute(p);
+ Future<Void> future = p.future();
+ assertThat(future.succeeded()).isTrue();
+
+ // simulate Cassandra unavailability to verify data is taken from cache
+ when(sessionProvider.get()).thenThrow(new
CassandraUnavailableException(CQL, "CQL unavailable"));
+
+ assertThat(schemaCache.getFullSchema()).isEqualTo(SCHEMA);
+ assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo(SCHEMA);
+ assertThat(schemaCache.getTableSchema(KEYSPACE,
TABLE)).isEqualTo(SCHEMA);
+ }
+
+ static SidecarConfiguration mockSidecarConfiguration()
+ {
+ SidecarConfiguration sidecarConfiguration =
mock(SidecarConfiguration.class);
+ DriverConfiguration driverConfiguration =
mock(DriverConfiguration.class);
+
when(driverConfiguration.unsupportedTableSchemaRefreshTime()).thenReturn(new
SecondBoundConfiguration(5, TimeUnit.SECONDS));
+
when(sidecarConfiguration.driverConfiguration()).thenReturn(driverConfiguration);
+ return sidecarConfiguration;
+ }
+
+ static void mockPreparedQuery(Session session, String statement,
List<Map<String, String>> rows)
+ {
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ BoundStatement boundStatement = mock(BoundStatement.class);
+ when(session.prepare(eq(statement))).thenReturn(preparedStatement);
+ when(preparedStatement.bind()).thenReturn(boundStatement);
+ when(session.execute(eq(boundStatement))).then(invocation -> {
+ ResultSet resultSet = mock(ResultSet.class);
+ List<Row> mockRows = new ArrayList<>();
+ rows.forEach(row -> mockRows.add(mockRow(row)));
+ when(resultSet.all()).thenReturn(mockRows);
+ return resultSet;
+ });
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java
index fbda6c963..37ebd3e50 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java
@@ -53,12 +53,14 @@ import
org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -134,7 +136,7 @@ class SchemaHandlerTest
JsonObject jsonObject = response.bodyAsJsonObject();
assertThat(jsonObject.getString("keyspace")).isEqualTo("testKeyspace");
assertThat(jsonObject.getString("schema"))
- .isEqualTo(testKeyspaceSchema);
+ .isEqualTo(testKeyspaceSchema.trim());
context.completeNow();
})));
}
@@ -186,5 +188,15 @@ class SchemaHandlerTest
return mockInstancesMetadata;
}
+
+ @Provides
+ @Singleton
+ public DriverUnsupportedSchemaCache driverUnsupportedSchemaCache()
+ {
+ DriverUnsupportedSchemaCache schemaAccessor =
mock(DriverUnsupportedSchemaCache.class);
+ when(schemaAccessor.getFullSchema()).thenReturn("");
+ when(schemaAccessor.getKeyspaceSchema(any())).thenReturn("");
+ return schemaAccessor;
+ }
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java
index 498a9e623..13d58f952 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.vertx.core.Promise;
-
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CdcBridge;
@@ -42,6 +41,7 @@ import
org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.utils.CdcUtil;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.spark.data.CqlTable;
@@ -49,7 +49,6 @@ import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.cassandra.spark.utils.TableIdentifier;
-
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -73,6 +72,7 @@ class CassandraClusterSchemaMonitorTest
private CassandraClusterSchemaMonitor clusterSchema;
private InstanceMetadataFetcher mockInstanceFetcher;
private CdcDatabaseAccessor mockDatabaseAccessor;
+ private DriverUnsupportedSchemaCache mockDriverUnsupportedSchemaCache;
private SidecarConfiguration mockSidecarConfiguration;
private ServiceConfiguration mockServiceConfiguration;
private CdcConfiguration mockCdcConfiguration;
@@ -102,6 +102,7 @@ class CassandraClusterSchemaMonitorTest
{
mockInstanceFetcher = mock(InstanceMetadataFetcher.class);
mockDatabaseAccessor = mock(CdcDatabaseAccessor.class);
+ mockDriverUnsupportedSchemaCache =
mock(DriverUnsupportedSchemaCache.class);
mockSidecarConfiguration = mock(SidecarConfiguration.class);
mockServiceConfiguration = mock(ServiceConfiguration.class);
mockCdcConfiguration = mock(CdcConfiguration.class);
@@ -129,6 +130,7 @@ class CassandraClusterSchemaMonitorTest
// Setup database accessor
when(mockDatabaseAccessor.fullSchema()).thenReturn(INITIAL_SCHEMA);
+ when(mockDriverUnsupportedSchemaCache.getFullSchema()).thenReturn("");
when(mockDatabaseAccessor.partitioner()).thenReturn(Partitioner.Murmur3Partitioner);
when(mockDatabaseAccessor.getTableId(any(TableIdentifier.class))).thenReturn(UUID.randomUUID());
when(mockCassandraBridgeFactory.get(anyString())).thenReturn(mockCassandraBridge);
@@ -136,6 +138,7 @@ class CassandraClusterSchemaMonitorTest
clusterSchema = new CassandraClusterSchemaMonitor(
mockInstanceFetcher,
mockDatabaseAccessor,
+ mockDriverUnsupportedSchemaCache,
mockSidecarConfiguration,
mockCassandraBridgeFactory
);
@@ -223,6 +226,7 @@ class CassandraClusterSchemaMonitorTest
// Verify initial schema processing
verify(mockDatabaseAccessor, times(1)).fullSchema();
+ verify(mockDriverUnsupportedSchemaCache, times(1)).getFullSchema();
verify(mockCdcBridge, times(1)).updateCdcSchema(any(Set.class),
eq(Partitioner.Murmur3Partitioner), any());
// Second refresh - should detect schema change and update
@@ -230,6 +234,7 @@ class CassandraClusterSchemaMonitorTest
// Verify schema change detection and update
verify(mockDatabaseAccessor, times(2)).fullSchema();
+ verify(mockDriverUnsupportedSchemaCache, times(2)).getFullSchema();
verify(mockCdcBridge, times(2)).updateCdcSchema(any(Set.class),
eq(Partitioner.Murmur3Partitioner), any());
}
}
@@ -420,6 +425,7 @@ class CassandraClusterSchemaMonitorTest
Set<CqlTable> result =
CassandraClusterSchemaMonitor.buildCdcTables(
mockDatabaseAccessor,
+ mockDriverUnsupportedSchemaCache,
tableIdCache,
mockCassandraBridge
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]