bbotella commented on code in PR #101:
URL: 
https://github.com/apache/cassandra-analytics/pull/101#discussion_r1980060824


##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/CdcEventTransformer.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.spark.data.CqlField;
+
+/**
+ * Base abstraction to convert CdcEvent objects into another data format, e.g. 
Avro, Json etc
+ *
+ * @param <T> the type resulted as a result of the transformation.
+ */
+public abstract class CdcEventTransformer<T>
+{
+    public final Schema cdcSchema;
+    public final Schema ttlSchema;
+    public final Schema rangeSchema;
+
+    protected final BinaryEncoder encoder;
+    protected final Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup;
+    protected final SchemaStore store;
+
+    public CdcEventTransformer(SchemaStore store,
+                               Function<KeyspaceTypeKey, CqlField.CqlType> 
typeLookup,
+                               String templatePath)
+    {
+        this.cdcSchema = readSchema(templatePath);
+        this.ttlSchema = extractTtlSchema(cdcSchema);
+        this.rangeSchema = extractRangeSchema(cdcSchema);
+        this.encoder = EncoderFactory.get().binaryEncoder(new 
ByteArrayOutputStream(0), null);
+        this.typeLookup = typeLookup;
+        this.store = store;
+    }
+
+    public abstract T transform(CdcEvent event);

Review Comment:
   javadoc
   
   Same for the rest of the methods



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/CdcLogMode.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.msg.Value;
+import org.apache.cassandra.spark.data.CqlField;
+
+public enum CdcLogMode implements CdcLogger

Review Comment:
   javadoc



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/CachingSchemaStore.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Recommended implementation of SchemaStore that detects schema changes and 
regenerates Avro schema.
+ * Pass in a `SchemaStorePublisherFactory` to publish the schema downstream.
+ */
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    //    private final CassandraClusterSchema cassandraClusterSchema;
+//    private final SidecarSchema sidecarSchema;
+//    private final Vertx vertx;
+//    private final CdcConfigImpl cdcConfig;
+    @Nullable
+    volatile TableSchemaPublisher publisher;
+    @Nullable
+    volatile CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    //    private static final CqlToAvroSchemaConverter SCHEMA_CONVERTER = new 
CqlToAvroSchemaConverter();
+    private final SchemaSupplier schemaSupplier;
+    private final CassandraVersion cassandraVersion;
+    private final SchemaStorePublisherFactory schemaStorePublisherFactory;
+    private final CdcOptions cdcOptions;
+    private final CqlToAvroSchemaConverter schemaConverter;
+//    private final SidecarCdcStats sidecarCdcStats;

Review Comment:
   ```suggestion
   ```



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/CachingSchemaStore.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Recommended implementation of SchemaStore that detects schema changes and 
regenerates Avro schema.
+ * Pass in a `SchemaStorePublisherFactory` to publish the schema downstream.
+ */
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    //    private final CassandraClusterSchema cassandraClusterSchema;
+//    private final SidecarSchema sidecarSchema;
+//    private final Vertx vertx;
+//    private final CdcConfigImpl cdcConfig;

Review Comment:
   ```suggestion
   ```



##########
cassandra-analytics-cdc-codec/examples/basic.md:
##########
@@ -0,0 +1,202 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# basic Example
+
+## Cassandra Schema
+
+```
+CREATE TABLE examples.basic (
+    a text PRIMARY KEY,
+    b text,
+    c text
+)
+```
+
+## INSERT Example
+
+An INSERT into all primary keys and value columns.
+
+### Avro Header
+
+```
+{
+  "timestampMicros" : 1711149035712000,
+  "sourceTable" : "basic",
+  "sourceKeyspace" : "examples",
+  "schemaUuid" : "c02c81f8-83fb-3c36-988e-1cca7691e814",
+  "truncatedFields" : [ ],
+  "version" : {
+    "string" : "2"
+  },
+  "operationType" : "INSERT",
+  "isPartial" : true,
+  "updateFields" : {
+    "array" : [ "a", "b", "c" ]
+  },
+  "range" : null,
+  "ttl" : null,
+  "payload": "[binary serialized value of payload below]"
+}
+```
+
+
+
+### Avro Payload
+
+```
+{
+  "a" : {
+    "string" : "QsbX46DoBcpKOII2ggfRuP"
+  },
+  "b" : {
+    "string" : "fs"
+  },
+  "c" : {
+    "string" : "h4DCyo7N6h6mRIfQjX0Y"
+  }
+}
+```
+
+## UPDATE Example
+
+An UPDATE into all primary keys and a single value column.
+
+### Avro Header
+
+```
+{
+  "timestampMicros" : 1711149035712000,
+  "sourceTable" : "basic",
+  "sourceKeyspace" : "examples",
+  "schemaUuid" : "c02c81f8-83fb-3c36-988e-1cca7691e814",
+  "truncatedFields" : [ ],
+  "version" : {
+    "string" : "2"
+  },
+  "operationType" : "UPDATE",
+  "isPartial" : true,
+  "updateFields" : {
+    "array" : [ "a", "b" ]
+  },
+  "range" : null,
+  "ttl" : null,
+  "payload": "[binary serialized value of payload below]"
+}
+```
+
+
+
+### Avro Payload
+
+```
+{
+  "a" : {
+    "string" : "rnh5W9"
+  },
+  "b" : {
+    "string" : "hDmOY7Z4giTUAWuAk2vufwIYjj"
+  },
+  "c" : null
+}
+```
+
+## DELETE Example
+
+A point DELETE of a single value column.
+
+### Avro Header
+
+```
+{
+  "timestampMicros" : 1711149035712000,
+  "sourceTable" : "basic",
+  "sourceKeyspace" : "examples",
+  "schemaUuid" : "c02c81f8-83fb-3c36-988e-1cca7691e814",
+  "truncatedFields" : [ ],
+  "version" : {
+    "string" : "2"
+  },
+  "operationType" : "DELETE",
+  "isPartial" : true,
+  "updateFields" : {
+    "array" : [ "a", "b" ]
+  },
+  "range" : null,
+  "ttl" : null,
+  "payload": "[binary serialized value of payload below]"
+}
+```
+
+
+
+### Avro Payload
+
+```
+{
+  "a" : {
+    "string" : "Asemaitw9"
+  },
+  "b" : null,
+  "c" : null
+}
+```
+
+## PARTITION_DELETE Example
+
+A PARTITION_DELETE that deletes using only the partition keys.
+
+### Avro Header
+
+```
+{
+  "timestampMicros" : 1711149035713000,
+  "sourceTable" : "basic",
+  "sourceKeyspace" : "examples",
+  "schemaUuid" : "c02c81f8-83fb-3c36-988e-1cca7691e814",
+  "truncatedFields" : [ ],
+  "version" : {
+    "string" : "2"
+  },
+  "operationType" : "DELETE_PARTITION",
+  "isPartial" : true,
+  "updateFields" : {
+    "array" : [ "a" ]
+  },
+  "range" : null,
+  "ttl" : null,
+  "payload": "[binary serialized value of payload below]"
+}
+```
+
+
+
+### Avro Payload
+
+```
+{
+  "a" : {
+    "string" : "jqNMVJDtBLfxWmX1SC"
+  },
+  "b" : null,
+  "c" : null
+}
+```
+

Review Comment:
   What about adding examples for range delete?



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.TypeCache;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.Pair;
+import org.apache.cassandra.spark.utils.Preconditions;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class KafkaPublisher implements AutoCloseable
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPublisher.class);
+
+    protected TopicSupplier topicSupplier;
+    protected int maxRecordSizeBytes;
+    protected final RecordProducer recordProducer;
+    protected final EventHasher eventHasher;
+    protected boolean failOnRecordTooLargeError;
+    protected boolean failOnKafkaError;
+    protected CdcLogMode cdcLogMode;
+
+    protected final AtomicReference<Throwable> failure = new 
AtomicReference<>();
+    protected final KafkaProducer<String, byte[]> producer;
+    protected final Serializer<CdcEvent> serializer;
+
+    protected ThreadLocal<Map<Pair<String, String>, String>> prefixCache = 
ThreadLocal.withInitial(HashMap::new);
+    protected final KafkaStats kafkaStats;
+
+    public KafkaPublisher(TopicSupplier topicSupplier,
+                          KafkaProducer<String, byte[]> producer,
+                          Serializer<CdcEvent> serializer,
+                          int maxRecordSizeBytes,
+                          boolean failOnRecordTooLargeError,
+                          boolean failOnKafkaError,
+                          CdcLogMode logMode)
+    {
+        this(
+        topicSupplier,
+        producer,
+        serializer,
+        maxRecordSizeBytes,
+        failOnRecordTooLargeError,
+        failOnKafkaError,
+        logMode,
+        KafkaStats.STUB,
+        RecordProducer.DEFAULT,
+        EventHasher.MURMUR3
+        );
+    }
+
+    public KafkaPublisher(TopicSupplier topicSupplier,
+                          KafkaProducer<String, byte[]> producer,
+                          Serializer<CdcEvent> serializer,
+                          int maxRecordSizeBytes,
+                          boolean failOnRecordTooLargeError,
+                          boolean failOnKafkaError,
+                          CdcLogMode logMode,
+                          KafkaStats kafkaStats,
+                          RecordProducer recordProducer,
+                          EventHasher eventHasher)
+    {
+        this.topicSupplier = topicSupplier;
+        this.maxRecordSizeBytes = maxRecordSizeBytes;
+        this.failOnRecordTooLargeError = failOnRecordTooLargeError;
+        this.failOnKafkaError = failOnKafkaError;
+        this.cdcLogMode = logMode;
+        this.kafkaStats = kafkaStats;
+        this.serializer = serializer;
+        this.producer = producer;
+        this.eventHasher = eventHasher;
+        this.recordProducer = recordProducer;
+        CdcLogMode.init(this::getType);
+        kafkaStats.registerKafkaPublishErrorKpi();
+    }
+
+    public CqlField.CqlType getType(KeyspaceTypeKey key)
+    {
+        return TypeCache.get(version()).getType(key.keyspace, key.type);
+    }
+
+    public CassandraVersion version()
+    {
+        return CassandraVersion.FOURZERO;
+    }
+
+    public Logger logger()
+    {
+        //
+        return LOGGER;
+    }
+
+    protected RecordProducer recordProducer()
+    {
+        return recordProducer;
+    }
+
+    protected byte[] getPayload(String topic, CdcEvent event)
+    {
+        return serializer.serialize(topic, event);
+    }
+
+    public void processEvent(CdcEvent event)
+    {
+        final String topic = topicSupplier.topic(event);
+        cdcLogMode.info(logger(), "Processing CDC event", event, topic);
+        long time = System.currentTimeMillis();
+        final byte[] recordPayload = getPayload(topic, event);

Review Comment:
   can't `serializer.serialize` throw an exception that can be useful on the 
warn log message below?



##########
cassandra-analytics-cdc-codec/examples/basic.md:
##########
@@ -0,0 +1,202 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# basic Example

Review Comment:
   ```suggestion
   # Basic Example
   ```



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/AvroBaseRecordTransformer.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.cassandra.cdc.CdcEventTransformer;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.msg.Value;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.Preconditions;
+
+public abstract class AvroBaseRecordTransformer<T extends 
AvroBaseRecordTransformer.BaseSerializedEvent<P>, P>
+extends CdcEventTransformer<GenericData.Record>
+{
+    public static final int DEFAULT_TRUNCATE_THRESHOLD = 838861; // 1024 * 
1024 * 0.8

Review Comment:
   Some comments on why are we choosing 0.8Mb mentioning kafka defaults?



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.TypeCache;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.Pair;
+import org.apache.cassandra.spark.utils.Preconditions;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class KafkaPublisher implements AutoCloseable
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPublisher.class);
+
+    protected TopicSupplier topicSupplier;
+    protected int maxRecordSizeBytes;
+    protected final RecordProducer recordProducer;
+    protected final EventHasher eventHasher;
+    protected boolean failOnRecordTooLargeError;
+    protected boolean failOnKafkaError;
+    protected CdcLogMode cdcLogMode;
+
+    protected final AtomicReference<Throwable> failure = new 
AtomicReference<>();
+    protected final KafkaProducer<String, byte[]> producer;
+    protected final Serializer<CdcEvent> serializer;
+
+    protected ThreadLocal<Map<Pair<String, String>, String>> prefixCache = 
ThreadLocal.withInitial(HashMap::new);
+    protected final KafkaStats kafkaStats;
+
+    public KafkaPublisher(TopicSupplier topicSupplier,
+                          KafkaProducer<String, byte[]> producer,
+                          Serializer<CdcEvent> serializer,
+                          int maxRecordSizeBytes,
+                          boolean failOnRecordTooLargeError,
+                          boolean failOnKafkaError,
+                          CdcLogMode logMode)
+    {
+        this(
+        topicSupplier,
+        producer,
+        serializer,
+        maxRecordSizeBytes,
+        failOnRecordTooLargeError,
+        failOnKafkaError,
+        logMode,
+        KafkaStats.STUB,
+        RecordProducer.DEFAULT,
+        EventHasher.MURMUR3
+        );
+    }
+
+    public KafkaPublisher(TopicSupplier topicSupplier,
+                          KafkaProducer<String, byte[]> producer,
+                          Serializer<CdcEvent> serializer,
+                          int maxRecordSizeBytes,
+                          boolean failOnRecordTooLargeError,
+                          boolean failOnKafkaError,
+                          CdcLogMode logMode,
+                          KafkaStats kafkaStats,
+                          RecordProducer recordProducer,
+                          EventHasher eventHasher)
+    {
+        this.topicSupplier = topicSupplier;
+        this.maxRecordSizeBytes = maxRecordSizeBytes;
+        this.failOnRecordTooLargeError = failOnRecordTooLargeError;
+        this.failOnKafkaError = failOnKafkaError;
+        this.cdcLogMode = logMode;
+        this.kafkaStats = kafkaStats;
+        this.serializer = serializer;
+        this.producer = producer;
+        this.eventHasher = eventHasher;
+        this.recordProducer = recordProducer;
+        CdcLogMode.init(this::getType);
+        kafkaStats.registerKafkaPublishErrorKpi();
+    }
+
+    public CqlField.CqlType getType(KeyspaceTypeKey key)
+    {
+        return TypeCache.get(version()).getType(key.keyspace, key.type);
+    }
+
+    public CassandraVersion version()
+    {
+        return CassandraVersion.FOURZERO;
+    }
+
+    public Logger logger()
+    {
+        //

Review Comment:
   ```suggestion
   ```



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/CdcEventTransformer.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.spark.data.CqlField;
+
+/**
+ * Base abstraction to convert CdcEvent objects into another data format, e.g. 
Avro, Json etc
+ *
+ * @param <T> the type resulted as a result of the transformation.
+ */
+public abstract class CdcEventTransformer<T>
+{
+    public final Schema cdcSchema;
+    public final Schema ttlSchema;
+    public final Schema rangeSchema;
+
+    protected final BinaryEncoder encoder;
+    protected final Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup;
+    protected final SchemaStore store;
+
+    public CdcEventTransformer(SchemaStore store,
+                               Function<KeyspaceTypeKey, CqlField.CqlType> 
typeLookup,
+                               String templatePath)
+    {
+        this.cdcSchema = readSchema(templatePath);
+        this.ttlSchema = extractTtlSchema(cdcSchema);
+        this.rangeSchema = extractRangeSchema(cdcSchema);
+        this.encoder = EncoderFactory.get().binaryEncoder(new 
ByteArrayOutputStream(0), null);
+        this.typeLookup = typeLookup;
+        this.store = store;
+    }
+
+    public abstract T transform(CdcEvent event);
+
+    private static Schema readSchema(String filename)
+    {
+        ClassLoader classLoader = CdcEventTransformer.class.getClassLoader();
+        final InputStream is = classLoader.getResourceAsStream(filename);
+        try
+        {
+            return new Schema.Parser().parse(is);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Schema extractTtlSchema(Schema cdcSchema)
+    {
+        List<Schema> nullableTtlUnion = 
cdcSchema.getField("ttl").schema().getTypes();
+        return nullableTtlUnion.stream()
+                               .filter(s -> s.getType() == Schema.Type.RECORD)
+                               .findFirst()
+                               .get(); // the field exist. see cdc.avsc file

Review Comment:
   Still, should we use a `getOrThrow`?



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/AvroGenericRecordTransformer.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.Preconditions;
+
+import static 
org.apache.cassandra.cdc.avro.AvroGenericRecordTransformer.GenericRecordSerializedEvent;
+
+public class AvroGenericRecordTransformer extends
+                                          
AvroBaseRecordTransformer<GenericRecordSerializedEvent, GenericRecord>
+{
+    final String schemaNamespacePrefix;
+
+    public AvroGenericRecordTransformer(SchemaStore schemaStore,
+                                        Function<KeyspaceTypeKey, 
CqlField.CqlType> typeLookup,
+                                        String schemaNamespacePrefix)
+    {
+        this(schemaStore, typeLookup, DEFAULT_TRUNCATE_THRESHOLD, 
schemaNamespacePrefix);
+    }
+
+    public AvroGenericRecordTransformer(SchemaStore schemaStore,
+                                        Function<KeyspaceTypeKey, 
CqlField.CqlType> typeLookup,
+                                        int truncateThreshold,
+                                        String schemaNamespacePrefix)
+    {
+        super(schemaStore, typeLookup, truncateThreshold, 
"cdc_generic_record.avsc");
+        this.schemaNamespacePrefix = schemaNamespacePrefix;
+    }
+
+    @Override
+    public GenericRecordSerializedEvent serializeEvent(CdcEvent event)
+    {
+        CdcEventUtils.UpdatedEvent updatedEvent = 
CdcEventUtils.getUpdatedEvent(event, store, truncateThreshold, typeLookup);
+        return new GenericRecordSerializedEvent(updatedEvent.getRecord(), 
updatedEvent.getTruncatedFields(), event.table, event.keyspace);
+    }
+
+    @Override
+    public GenericData.Record 
buildRecordWithPayload(GenericRecordSerializedEvent serializedEvent)
+    {
+        GenericRecord payload = serializedEvent.payload;
+        Schema newSchema = getTempSchemaForEvent(serializedEvent);
+        GenericData.Record record = new GenericData.Record(newSchema);
+        record.put("payload", payload);

Review Comment:
   Possibly a nit, but it would be fine to move all of the wrapper schema 
fields to their own static constants



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/CachingSchemaStore.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Recommended implementation of SchemaStore that detects schema changes and 
regenerates Avro schema.
+ * Pass in a `SchemaStorePublisherFactory` to publish the schema downstream.
+ */
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    //    private final CassandraClusterSchema cassandraClusterSchema;
+//    private final SidecarSchema sidecarSchema;
+//    private final Vertx vertx;
+//    private final CdcConfigImpl cdcConfig;
+    @Nullable
+    volatile TableSchemaPublisher publisher;
+    @Nullable
+    volatile CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    //    private static final CqlToAvroSchemaConverter SCHEMA_CONVERTER = new 
CqlToAvroSchemaConverter();
+    private final SchemaSupplier schemaSupplier;
+    private final CassandraVersion cassandraVersion;
+    private final SchemaStorePublisherFactory schemaStorePublisherFactory;
+    private final CdcOptions cdcOptions;
+    private final CqlToAvroSchemaConverter schemaConverter;
+//    private final SidecarCdcStats sidecarCdcStats;
+
+    CachingSchemaStore(
+//    Vertx vertx,
+//                       CassandraClusterSchema cassandraClusterSchema,
+//                       CdcConfigImpl cdcConfig,
+//                       SidecarCdcStats sidecarCdcStats,
+//                       SidecarSchema sidecarSchema
+    CassandraVersion cassandraVersion, //TODO: supply version to permit 
changes in version
+    SchemaSupplier schemaSupplier,
+    SchemaStorePublisherFactory schemaStorePublisherFactory,
+    CdcOptions cdcOptions,
+    CqlToAvroSchemaConverter schemaConverter
+    )
+    {
+        super();
+        this.cassandraVersion = cassandraVersion;
+        this.schemaSupplier = schemaSupplier;
+        this.schemaStorePublisherFactory = schemaStorePublisherFactory;
+        this.cdcOptions = cdcOptions;
+        this.schemaConverter = schemaConverter;
+//        this.cassandraClusterSchema = cassandraClusterSchema;
+//        this.sidecarSchema = sidecarSchema;
+//        
this.avroSchemasCache.putAll(createSchemaCache(schemaSupplier.getCdcEnabledTables().get()));
+        AvroSchemas.registerLogicalTypes();
+//        
cassandraClusterSchema.addSchemaChangeListener(this::onSchemaChanged); //FIXME
+//        this.vertx = vertx;
+//        this.cdcConfig = cdcConfig;
+//        this.sidecarCdcStats = sidecarCdcStats;
+
+        final Callable<Void> configChangeCallback = () -> {
+            LOGGER.info("Services configuration changed. Reloading 
publisher...");
+            publishSchemas();
+            return null;
+        };
+//        this.cdcConfig.registerConfigChangeListener(configChangeCallback); 
//FIXME
+        configureSidecarServerEventListeners();
+    }
+
+    private TableSchemaPublisher publisher()
+    {
+        if (this.publisher == null)
+        {
+            this.publisher = 
schemaStorePublisherFactory.buildPublisher(cdcOptions); 
//SchemaStorePublisherFactory.getFromCdcConfig(cdcConfig);
+        }
+        return this.publisher;
+    }
+
+    private void configureSidecarServerEventListeners()
+    {
+        //FIXME
+//        EventBus eventBus = vertx.eventBus();
+//
+//        eventBus.localConsumer(ON_SERVER_START.address(), startMessage -> {
+//            eventBus.localConsumer(ON_SIDECAR_SCHEMA_INITIALIZED.address(), 
message -> {
+//                LOGGER.debug("Sidecar Schema initialized message={}", 
message);
+//                Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchema.getCdcTables();
+//                for (CqlTable cqlTable : refreshedCdcTables)
+//                {
+//                    TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+//                    avroSchemasCache.compute(tableIdentifier, (k, v) ->
+//                    {
+//                        
cdcDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStmt());
+//                        return v;
+//                    });
+//                }
+//                loadPublisher();
+//                publishSchemas();
+//            });
+//        });

Review Comment:
   I guess this implementation is still to be finished?



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/CachingSchemaStore.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Recommended implementation of SchemaStore that detects schema changes and 
regenerates Avro schema.
+ * Pass in a `SchemaStorePublisherFactory` to publish the schema downstream.
+ */
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    //    private final CassandraClusterSchema cassandraClusterSchema;
+//    private final SidecarSchema sidecarSchema;
+//    private final Vertx vertx;
+//    private final CdcConfigImpl cdcConfig;
+    @Nullable
+    volatile TableSchemaPublisher publisher;
+    @Nullable
+    volatile CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    //    private static final CqlToAvroSchemaConverter SCHEMA_CONVERTER = new 
CqlToAvroSchemaConverter();
+    private final SchemaSupplier schemaSupplier;
+    private final CassandraVersion cassandraVersion;
+    private final SchemaStorePublisherFactory schemaStorePublisherFactory;
+    private final CdcOptions cdcOptions;
+    private final CqlToAvroSchemaConverter schemaConverter;
+//    private final SidecarCdcStats sidecarCdcStats;
+
+    CachingSchemaStore(
+//    Vertx vertx,
+//                       CassandraClusterSchema cassandraClusterSchema,
+//                       CdcConfigImpl cdcConfig,
+//                       SidecarCdcStats sidecarCdcStats,
+//                       SidecarSchema sidecarSchema

Review Comment:
   ```suggestion
   ```



##########
cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/CachingSchemaStore.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.schemastore;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Recommended implementation of SchemaStore that detects schema changes and 
regenerates Avro schema.
+ * Pass in a `SchemaStorePublisherFactory` to publish the schema downstream.
+ */
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    //    private final CassandraClusterSchema cassandraClusterSchema;
+//    private final SidecarSchema sidecarSchema;
+//    private final Vertx vertx;
+//    private final CdcConfigImpl cdcConfig;
+    @Nullable
+    volatile TableSchemaPublisher publisher;
+    @Nullable
+    volatile CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    //    private static final CqlToAvroSchemaConverter SCHEMA_CONVERTER = new 
CqlToAvroSchemaConverter();

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to