This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new c943b2c CASSANALYTICS-8: Add Schema Store interfaces for CDC (#100)
c943b2c is described below
commit c943b2c656f07188c9c9eda6b977545db8b20b30
Author: Bernardo Botella <[email protected]>
AuthorDate: Thu Feb 13 11:57:31 2025 -0800
CASSANALYTICS-8: Add Schema Store interfaces for CDC (#100)
Patch by Bernardo Botella Corbi; Reviewed by Francisco Guerrero, James
Berragan, Yifan Cai for CASSANALYTICS-8
---
CHANGES.txt | 1 +
.../build.gradle | 38 +++--
.../cdc/schemastore/LocalTableSchemaStore.java | 174 +++++++++++++++++++++
.../cdc/schemastore/PublishSchemaResult.java | 35 +++--
.../cassandra/cdc/schemastore/SchemaStore.java | 77 +++++++++
.../cdc/schemastore/TableSchemaPublisher.java | 36 +++--
.../src/main/resources/cdc_bytes.avsc | 159 +++++++++++++++++++
.../src/main/resources/cdc_generic_record.avsc | 155 ++++++++++++++++++
.../src/main/resources/table_schemas/example.avsc | 34 ++--
gradle.properties | 2 +
settings.gradle | 4 +-
11 files changed, 661 insertions(+), 54 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c98bc2..086a5e4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Add Schema Store interfaces for CDC (CASSANALYTICS-8)
* Add CassandraBridge helper APIs that can be used by external tooling
(CASSANALYTICS-4)
* Refactor to decouple RowIterator and CellIterator from Spark so bulk reads
can be performed outside of Spark (CASSANDRA-20259)
* CEP-44 Kafka integration for Cassandra CDC using Sidecar (CASSANDRA-19962)
diff --git a/settings.gradle b/cassandra-analytics-cdc-codec/build.gradle
similarity index 60%
copy from settings.gradle
copy to cassandra-analytics-cdc-codec/build.gradle
index bd431fe..be13c33 100644
--- a/settings.gradle
+++ b/cassandra-analytics-cdc-codec/build.gradle
@@ -1,3 +1,5 @@
+import java.nio.file.Paths
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,17 +19,27 @@
* under the License.
*/
-rootProject.name = 'cassandra-analytics-core'
+plugins {
+ id('java-library')
+ id('maven-publish')
+}
+
+java {
+ withJavadocJar()
+ withSourcesJar()
+}
+
+publishing {
+ publications {
+ maven(MavenPublication) {
+ from components.java
+ groupId project.group
+ artifactId "${archivesBaseName}"
+ version System.getenv("CODE_VERSION") ?: "${version}"
+ }
+ }
+}
-include 'cassandra-analytics-cdc'
-include 'cassandra-analytics-common'
-include 'cassandra-bridge'
-include 'cassandra-four-zero'
-include 'cassandra-four-zero-bridge'
-include 'cassandra-four-zero-types'
-include 'cassandra-analytics-core'
-include 'cassandra-analytics-core-example'
-include 'cassandra-analytics-integration-framework'
-include 'cassandra-analytics-integration-tests'
-include 'cassandra-analytics-spark-converter'
-include 'cassandra-analytics-spark-four-zero-converter'
\ No newline at end of file
+dependencies {
+ implementation "org.apache.avro:avro:${avroVersion}"
+}
diff --git
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/LocalTableSchemaStore.java
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/LocalTableSchemaStore.java
new file mode 100644
index 0000000..12fbe09
--- /dev/null
+++
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/LocalTableSchemaStore.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+
+/**
+ * This is an example implementation of a Schema Store. In this particular
implementation, we rely on avro schemas
+ * placed under the resources folder, inside the table_schemas folder. They
will be used as schemas
+ * for the tables that have CDC enabled.
+ */
+public class LocalTableSchemaStore implements SchemaStore
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LocalTableSchemaStore.class);
+ private final Map<String, Schema> cache = new HashMap<>();
+
+ private final Map<String, GenericDatumWriter<GenericRecord>> writers = new
HashMap<>();
+ private final Map<String, GenericDatumReader<GenericRecord>> readers = new
HashMap<>();
+
+ private static class Holder
+ {
+ private static final LocalTableSchemaStore INSTANCE = new
LocalTableSchemaStore();
+ }
+
+ public static LocalTableSchemaStore getInstance()
+ {
+ return Holder.INSTANCE;
+ }
+
+ protected LocalTableSchemaStore()
+ {
+ // must register the custom logical types before loading the avro
schemas
+ loadFromResource();
+ }
+
+ /**
+ * Expects cassandra keyspace name for namespace and cassandra table name
for name.
+ * @return the schema, or throws
+ */
+ @Override
+ public Schema getSchema(String namespace, String name)
+ {
+ return cache.get(namespace);
+ }
+
+ /**
+ * @return the writer, or throws if schema is not found
+ */
+ @Override
+ public GenericDatumWriter<GenericRecord> getWriter(String namespace,
String name)
+ {
+ return writers.get(namespace);
+ }
+
+ /**
+ * @return the reader, or throws if schema is not found
+ */
+ @Override
+ public GenericDatumReader<GenericRecord> getReader(String namespace,
String name)
+ {
+ return readers.get(namespace);
+ }
+
+ protected void loadFromResource()
+ {
+ FileSystem jarFs = null;
+ try
+ {
+ URL url = getClass().getClassLoader().getResource("table_schemas");
+ if (url == null)
+ {
+ throw new RuntimeException("Resource table_schemas not found");
+ }
+
+ URI schemas = url.toURI();
+ Path path;
+ if (schemas.getScheme().equals("jar"))
+ {
+ jarFs = FileSystems.newFileSystem(schemas, new HashMap<>());
+ path = jarFs.getPath("table_schemas");
+ }
+ else
+ {
+ path = Paths.get(schemas);
+ }
+
+ try (Stream<Path> paths = Files.walk(path, 1))
+ {
+ Schema.Parser parser = new Schema.Parser();
+ paths.forEach(p -> {
+ if (!p.toString().endsWith(".avsc"))
+ {
+ return;
+ }
+ try
+ {
+ InputStream is = Files.newInputStream(p);
+ Schema schema = parser.parse(is);
+ String key = schema.getNamespace();
+ LOGGER.info("Loading schema namespace={}", key);
+ cache.put(key, schema);
+ writers.put(key, new GenericDatumWriter<>(schema));
+ readers.put(key, new GenericDatumReader<>(schema));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to read from table_schemas", e);
+ }
+ finally
+ {
+ if (jarFs != null)
+ {
+ try
+ {
+ jarFs.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to close jar", e);
+ }
+ }
+ }
+ }
+}
diff --git a/settings.gradle
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/PublishSchemaResult.java
similarity index 60%
copy from settings.gradle
copy to
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/PublishSchemaResult.java
index bd431fe..f80d3ee 100644
--- a/settings.gradle
+++
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/PublishSchemaResult.java
@@ -17,17 +17,26 @@
* under the License.
*/
-rootProject.name = 'cassandra-analytics-core'
+package org.apache.cassandra.cdc.schemastore;
-include 'cassandra-analytics-cdc'
-include 'cassandra-analytics-common'
-include 'cassandra-bridge'
-include 'cassandra-four-zero'
-include 'cassandra-four-zero-bridge'
-include 'cassandra-four-zero-types'
-include 'cassandra-analytics-core'
-include 'cassandra-analytics-core-example'
-include 'cassandra-analytics-integration-framework'
-include 'cassandra-analytics-integration-tests'
-include 'cassandra-analytics-spark-converter'
-include 'cassandra-analytics-spark-four-zero-converter'
\ No newline at end of file
+/**
+ * Object representing the result of publishing a schema on a schema store.
+ */
+public class PublishSchemaResult
+{
+
+ /**
+ * The id of the schema that has been published.
+ */
+ private final String schemaId;
+
+ public PublishSchemaResult(String schemaId)
+ {
+ this.schemaId = schemaId;
+ }
+
+ public String getSchemaId()
+ {
+ return schemaId;
+ }
+}
diff --git
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/SchemaStore.java
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/SchemaStore.java
new file mode 100644
index 0000000..13d6489
--- /dev/null
+++
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/SchemaStore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.Nullable;
+
+/**
+ * Interface representing a CDC schema store.
+ */
+public interface SchemaStore
+{
+ /**
+ * Get the avro corresponding to the namespace and the name of the schema
+ * @param namespace
+ * @param name
+ * @return avro schema, or return null if nothing can be found
+ */
+ Schema getSchema(String namespace, String name);
+
+ /**
+ * Get the schema version corresponding to the namespace and the name of
the schema
+ * @param namespace
+ * @param name
+ * @return a type 3 (name based) UUID generated based on the MD5 of the
CQL schema,
+ * or return null if nothing can be found
+ */
+ @Nullable
+ default String getVersion(String namespace, String name)
+ {
+ Schema schema = getSchema(namespace, name);
+ if (schema == null)
+ {
+ return null;
+ }
+ return
UUID.nameUUIDFromBytes(schema.toString().getBytes(StandardCharsets.UTF_8)).toString();
+ }
+
+ /**
+ * Get the datum writer
+ * @param namespace
+ * @param name
+ * @return datum writer or null if schema is not found
+ */
+ GenericDatumWriter<GenericRecord> getWriter(String namespace, String name);
+
+ /**
+ * Get the datum reader
+ * @param namespace
+ * @param name
+ * @return datum reader or null if schema is not found
+ */
+ GenericDatumReader<GenericRecord> getReader(String namespace, String name);
+}
diff --git a/settings.gradle
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/TableSchemaPublisher.java
similarity index 58%
copy from settings.gradle
copy to
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/TableSchemaPublisher.java
index bd431fe..d63be0d 100644
--- a/settings.gradle
+++
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/TableSchemaPublisher.java
@@ -17,17 +17,27 @@
* under the License.
*/
-rootProject.name = 'cassandra-analytics-core'
+package org.apache.cassandra.cdc.schemastore;
-include 'cassandra-analytics-cdc'
-include 'cassandra-analytics-common'
-include 'cassandra-bridge'
-include 'cassandra-four-zero'
-include 'cassandra-four-zero-bridge'
-include 'cassandra-four-zero-types'
-include 'cassandra-analytics-core'
-include 'cassandra-analytics-core-example'
-include 'cassandra-analytics-integration-framework'
-include 'cassandra-analytics-integration-tests'
-include 'cassandra-analytics-spark-converter'
-include 'cassandra-analytics-spark-four-zero-converter'
\ No newline at end of file
+import java.util.HashMap;
+
+/**
+ * Interface representing a CDC schema publisher.
+ */
+public interface TableSchemaPublisher
+{
+
+ /**
+ * Publishes the schema producing a result.
+ *
+ * @param schema A string containing a valid schema.
+ * @param metadata All the needed metadata associated to the schema.
+ * @return The publishing result.
+ */
+ PublishSchemaResult publishSchema(String schema, SchemaPublishMetadata
metadata);
+
+ class SchemaPublishMetadata extends HashMap<String, String>
+ {
+
+ };
+}
diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
new file mode 100644
index 0000000..717b010
--- /dev/null
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ "type": "record",
+ "name": "CassandraCDC",
+ "namespace": "org.apache.cassandra",
+ "doc": "primary_keys and partition_keys are added dynamically in
RowSerializer",
+ "fields": [
+ {
+ "name": "timestampMicros",
+ "type": "long"
+ },
+ {
+ "name": "sourceTable",
+ "type": "string"
+ },
+ {
+ "name": "sourceKeyspace",
+ "type": "string"
+ },
+ {
+ "name": "schemaUuid",
+ "type": "string"
+ },
+ {
+ "name": "truncatedFields",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "version",
+ "type": ["null","string"],
+ "default": null,
+ "doc": "to allow serde of evolving schemas"
+ },
+ {
+ "name": "operationType",
+ "type": {
+ "type": "enum",
+ "name": "OperationType",
+ "namespace": "com.apple.mg",
+ "symbols": [
+ "INSERT",
+ "UPDATE",
+ "DELETE",
+ "DELETE_RANGE",
+ "DELETE_PARTITION"
+ ]
+ }
+ },
+ {
+ "name": "isPartial",
+ "type": "boolean",
+ "default": false
+ },
+ {
+ "name": "updateFields",
+ "doc": "used to differentiate between absent data, and data being
updated to a null value (when isPartial = true)",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": "string"
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "range",
+ "doc": "clustering keys only - logical AND of predicates",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "RangePredicate",
+ "namespace": "com.apple.mg",
+ "fields": [
+ {
+ "name": "field",
+ "type": "string"
+ },
+ {
+ "name": "rangePredicateType",
+ "type": {
+ "type": "enum",
+ "name": "RangePredicateType",
+ "namespace": "com.apple.mg",
+ "symbols": [
+ "LT",
+ "LTE",
+ "EQ",
+ "GTE",
+ "GT",
+ "IN"
+ ]
+ }
+ },
+ {
+ "name": "value",
+ "doc": "table schema (same as primary payload) with only a
single field populated",
+ "type": "bytes"
+ }
+ ]
+ }
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "ttl",
+ "doc": "TTL (if any) of the associated row",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "TTL",
+ "namespace": "com.apple.mg",
+ "fields": [
+ {
+ "name": "ttl",
+ "type": "int",
+ "doc": "Relative time in seconds as set in the CQL query"
+ },
+ {
+ "name": "deletedAt",
+ "type": "int",
+ "doc": "Future timestamp in seconds"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "payload",
+ "type": "bytes"
+ }
+ ]
+}
diff --git
a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
new file mode 100644
index 0000000..39424c7
--- /dev/null
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ "type": "record",
+ "name": "CassandraCDC",
+ "namespace": "org.apache.cassandra",
+ "doc": "primary_keys and partition_keys are added dynamically in
RowSerializer",
+ "fields": [
+ {
+ "name": "timestampMicros",
+ "type": "long"
+ },
+ {
+ "name": "sourceTable",
+ "type": "string"
+ },
+ {
+ "name": "sourceKeyspace",
+ "type": "string"
+ },
+ {
+ "name": "schemaUuid",
+ "type": "string"
+ },
+ {
+ "name": "truncatedFields",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "version",
+ "type": ["null","string"],
+ "default": null,
+ "doc": "to allow serde of evolving schemas"
+ },
+ {
+ "name": "operationType",
+ "type": {
+ "type": "enum",
+ "name": "OperationType",
+ "namespace": "com.apple.mg",
+ "symbols": [
+ "INSERT",
+ "UPDATE",
+ "DELETE",
+ "DELETE_RANGE",
+ "DELETE_PARTITION"
+ ]
+ }
+ },
+ {
+ "name": "isPartial",
+ "type": "boolean",
+ "default": false
+ },
+ {
+ "name": "updateFields",
+ "doc": "used to differentiate between absent data, and data being
updated to a null value (when isPartial = true)",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": "string"
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "range",
+ "doc": "clustering keys only - logical AND of predicates",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "RangePredicate",
+ "namespace": "com.apple.mg",
+ "fields": [
+ {
+ "name": "field",
+ "type": "string"
+ },
+ {
+ "name": "rangePredicateType",
+ "type": {
+ "type": "enum",
+ "name": "RangePredicateType",
+ "namespace": "com.apple.mg",
+ "symbols": [
+ "LT",
+ "LTE",
+ "EQ",
+ "GTE",
+ "GT",
+ "IN"
+ ]
+ }
+ },
+ {
+ "name": "value",
+ "doc": "table schema (same as primary payload) with only a
single field populated",
+ "type": "bytes"
+ }
+ ]
+ }
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "ttl",
+ "doc": "TTL (if any) of the associated row",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "TTL",
+ "namespace": "com.apple.mg",
+ "fields": [
+ {
+ "name": "ttl",
+ "type": "int",
+ "doc": "Relative time in seconds as set in the CQL query"
+ },
+ {
+ "name": "deletedAt",
+ "type": "int",
+ "doc": "Future timestamp in seconds"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/settings.gradle
b/cassandra-analytics-cdc-codec/src/main/resources/table_schemas/example.avsc
similarity index 60%
copy from settings.gradle
copy to
cassandra-analytics-cdc-codec/src/main/resources/table_schemas/example.avsc
index bd431fe..7cf59bd 100644
--- a/settings.gradle
+++
b/cassandra-analytics-cdc-codec/src/main/resources/table_schemas/example.avsc
@@ -17,17 +17,23 @@
* under the License.
*/
-rootProject.name = 'cassandra-analytics-core'
-
-include 'cassandra-analytics-cdc'
-include 'cassandra-analytics-common'
-include 'cassandra-bridge'
-include 'cassandra-four-zero'
-include 'cassandra-four-zero-bridge'
-include 'cassandra-four-zero-types'
-include 'cassandra-analytics-core'
-include 'cassandra-analytics-core-example'
-include 'cassandra-analytics-integration-framework'
-include 'cassandra-analytics-integration-tests'
-include 'cassandra-analytics-spark-converter'
-include 'cassandra-analytics-spark-four-zero-converter'
\ No newline at end of file
+{
+ "type" : "record",
+ "name" : "example",
+ "namespace" : "foo.namespace",
+ "doc" : "This is an example avro schema for a foo.namespace.example table",
+ "fields" : [ {
+ "name" : "id",
+ "type" : [ {
+ "type" : "string",
+ "logicalType" : "uuid"
+ }, "null" ],
+ "doc" : "doc"
+ }, {
+ "name" : "details",
+ "type" : [ "bytes", "null" ],
+ "doc" : "doc"
+ } ],
+ "primary_keys" : [ "id" ],
+ "partition_keys" : [ "id" ]
+}
\ No newline at end of file
diff --git a/gradle.properties b/gradle.properties
index 49e05bd..262e26b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,3 +39,5 @@ aswSdkVersion=2.26.12
commonsLang3Version=3.12.0
org.gradle.jvmargs=-Xmx4096m
+
+avroVersion=1.10.2
diff --git a/settings.gradle b/settings.gradle
index bd431fe..21e1965 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -30,4 +30,6 @@ include 'cassandra-analytics-core-example'
include 'cassandra-analytics-integration-framework'
include 'cassandra-analytics-integration-tests'
include 'cassandra-analytics-spark-converter'
-include 'cassandra-analytics-spark-four-zero-converter'
\ No newline at end of file
+include 'cassandra-analytics-spark-four-zero-converter'
+include 'cassandra-analytics-cdc-codec'
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]