echauchot commented on code in PR #35:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/35#discussion_r2215451721


##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */

Review Comment:
   refer the source doc for cluster builder creation



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -98,16 +143,56 @@ public class CassandraSource<OUT>
     private static final long serialVersionUID = 1L;
 
     private final ClusterBuilder clusterBuilder;
-    private final Class<OUT> pojoClass;
     private final String query;
     private final String keyspace;
     private final String table;
-    private final MapperOptions mapperOptions;
-
+    private final CassandraRowToTypeConverter<OUT> rowToTypeConverter;
     private final long maxSplitMemorySize;
-    private static final long MIN_SPLIT_MEMORY_SIZE = 
MemorySize.ofMebiBytes(10).getBytes();
+    static final long MIN_SPLIT_MEMORY_SIZE = 
MemorySize.ofMebiBytes(10).getBytes();
     static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = 
MemorySize.ofMebiBytes(64).getBytes();
 
+    /**
+     * Creates a new builder for configuring a CassandraSource.
+     *
+     * @return a new CassandraSourceBuilder instance
+     */
+    public static CassandraSourceBuilder builder() {
+        return new CassandraSourceBuilder();
+    }
+
+    CassandraSource(
+            ClusterBuilder clusterBuilder,
+            CassandraRowToTypeConverter<OUT> rowToTypeConverter,
+            String query) {
+        this(clusterBuilder, MAX_SPLIT_MEMORY_SIZE_DEFAULT, 
rowToTypeConverter, query);
+    }
+
+    CassandraSource(
+            ClusterBuilder clusterBuilder,
+            long maxSplitMemorySize,
+            CassandraRowToTypeConverter<OUT> rowToTypeConverter,
+            String query) {
+        Objects.requireNonNull(clusterBuilder, "ClusterBuilder required but 
not provided");
+        Objects.requireNonNull(rowToTypeConverter, "Row converter required but 
not provided");
+        Objects.requireNonNull(query, "query required but not provided");
+        if (maxSplitMemorySize < MIN_SPLIT_MEMORY_SIZE) {

Review Comment:
   use flink util preconditions: checkNotNull, checkState ...



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setClusterBuilder(ClusterBuilder 
clusterBuilder) {
+        this.clusterBuilder = clusterBuilder;
+        return this;
+    }
+
+    /**
+     * Sets the CQL query to execute.
+     *
+     * <p>Query must be a simple SELECT statement without aggregations, ORDER 
BY, or GROUP BY
+     * clauses, as these operations are not supported when the query is 
executed on data partitions.
+     *
+     * @param query the CQL query string
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setQuery(String query) {
+        this.query = query;
+        return this;
+    }
+
+    /**
+     * Sets the maximum memory size for each split. Larger tables will be 
divided into multiple
+     * splits based on this size.
+     *
+     * <p>Default: 64MB, Minimum: 10MB
+     *
+     * @param maxSplitMemorySize the maximum memory size per split
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMaxSplitMemorySize(MemorySize 
maxSplitMemorySize) {
+        this.maxSplitMemorySize = maxSplitMemorySize.getBytes();
+        return this;
+    }
+
+    /**
+     * Sets mapper options for POJO mapping configuration.
+     *
+     * <p>Optional. If not set, default mapper options will be used.
+     *
+     * @param mapperOptions the mapper options for DataStax object mapper
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMapperOptions(MapperOptions 
mapperOptions) {
+        this.mapperOptions = mapperOptions;
+        return this;
+    }
+
+    /**
+     * Builds a CassandraSource configured for POJO output using DataStax 
object mapper.

Review Comment:
   `{@link CassandraSource}`



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java:
##########
@@ -45,7 +46,24 @@ class CassandraSourceReader<OUT>
     private final Cluster cluster;
     private final Session session;
 
-    // created by the factory
+    CassandraSourceReader(
+            SourceReaderContext context,
+            String query,
+            String keyspace,
+            String table,
+            Cluster cluster,
+            Session session,
+            CassandraRowToTypeConverter<OUT> rowConverter) {
+        super(
+                () -> new CassandraSplitReader(cluster, session, query, 
keyspace, table),
+                new CassandraRowEmitter<>(rowConverter),
+                context.getConfiguration(),
+                context);
+        this.cluster = cluster;
+        this.session = session;
+    }
+
+    @Deprecated

Review Comment:
   CassandraSourceReader and CassandraSourceReaderFactory are internal classes. 
There is no need to deprecate old constructor/factory.create() as they are not 
public to the user. This old code is no more used so you can safely remove it.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CassandraFieldMapper.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+
+import com.datastax.driver.core.Row;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for all Cassandra field mappers that convert values to 
Flink's internal {@link
+ * org.apache.flink.table.types.logical.LogicalTypeRoot}.
+ */
+@Internal
+public interface CassandraFieldMapper extends Serializable {
+
+    /**
+     * Extract a field value from a Cassandra Row and convert it to Flink 
internal format.
+     *
+     * <p>This method is used when reading from the Cassandra {@link Row} 
object returned by a
+     * query. It handles null checking and uses Cassandra's type-specific 
getters (getString,
+     * getInt, etc.) to safely extract the field value before converting it to 
Flink's internal
+     * representation.
+     *
+     * @param row the Cassandra {@link Row} object containing the field
+     * @param fieldName the name of the field to extract
+     * @return the field value converted to Flink internal format, or null if 
the field is null
+     */
+    Object extractFromRow(Row row, String fieldName);

Review Comment:
   Some changes:
   - annotate with Nullable for static analysis
   - I'd rename extractRowFieldValue for clarity 



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CassandraFieldMapperFactory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Factory for creating appropriate field mappers based on Flink logical 
types. */
+@Internal
+public final class CassandraFieldMapperFactory {
+
+    private CassandraFieldMapperFactory() {}
+
+    /**
+     * Creates a field mapper for the given logical type. Different mappers 
for different {@link
+     * com.datastax.driver.core.DataType}
+     *
+     * @param logicalType the Flink logical type
+     * @return appropriate field mapper for the type
+     */
+    public static CassandraFieldMapper createFieldMapper(LogicalType 
logicalType) {
+        LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
+
+        switch (typeRoot) {
+                // Supported primitive types
+            case BOOLEAN:
+                return new PrimitiveFieldMappers.BooleanMapper();
+            case TINYINT:
+                return new PrimitiveFieldMappers.ByteMapper();
+            case SMALLINT:
+                return new PrimitiveFieldMappers.ShortMapper();
+            case INTEGER:
+                return new PrimitiveFieldMappers.IntegerMapper();
+            case BIGINT:

Review Comment:
   is there an overflow risk ? Why not use the VarintMapper ?
   



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/CassandraConnectorOptions.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for the Cassandra connector. */
+@PublicEvolving
+public class CassandraConnectorOptions {

Review Comment:
   some conf items are duplicated, how do they articulate, I see the validation 
and the defaults but:
   - if a user provides host and port + custom clusterbuilder: which has the 
precedence ?
   - if a user provides keyspace/table + custom query: which has the precedence 
?



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/PrimitiveFieldMappers.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import com.datastax.driver.core.Duration;
+import com.datastax.driver.core.LocalDate;
+import com.datastax.driver.core.Row;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Date;
+
+/** Collection of primitive type field mappers for Cassandra to Flink 
conversion. */
+@Internal
+public final class PrimitiveFieldMappers {
+
+    private PrimitiveFieldMappers() {}
+
+    /** Boolean field mapper. */
+    public static final class BooleanMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getBool(fieldName);
+        }
+    }
+
+    /** Byte field mapper. */
+    public static final class ByteMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getByte(fieldName);
+        }
+    }
+
+    /** Short field mapper. */
+    public static final class ShortMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getShort(fieldName);
+        }
+    }
+
+    /** Integer field mapper. */
+    public static final class IntegerMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getInt(fieldName);
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            if (value instanceof Number) {
+                return ((Number) value).intValue();
+            }
+            return value;
+        }
+    }
+
+    /** Long field mapper. */
+    public static final class LongMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getLong(fieldName);
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            if (value instanceof Number) {
+                return ((Number) value).longValue();
+            }
+            return value;
+        }
+    }
+
+    /** Float field mapper. */
+    public static final class FloatMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getFloat(fieldName);
+        }
+    }
+
+    /** Double field mapper. */
+    public static final class DoubleMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            return row.isNull(fieldName) ? null : row.getDouble(fieldName);
+        }
+    }
+
+    /** String field mapper that handles text, uuid, timeuuid, and inet types. 
*/
+    public static final class StringMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            if (row.isNull(fieldName)) {
+                return null;
+            }
+            String columnType = 
row.getColumnDefinitions().getType(fieldName).getName().toString();
+
+            switch (columnType) {
+                case "inet":
+                    InetAddress inet = row.getInet(fieldName);
+                    return StringData.fromString(inet.getHostAddress());
+                case "duration":
+                    Duration duration = row.get(fieldName, Duration.class);
+                    return StringData.fromString(duration.toString());
+                case "uuid":
+                case "timeuuid":
+                    return 
StringData.fromString(row.getUUID(fieldName).toString());
+                default:
+                    return convertValue(row.getString(fieldName));
+            }
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            return StringData.fromString(value.toString());
+        }
+    }
+
+    /**
+     * Decimal field mapper for Cassandra DECIMAL type.
+     *
+     * <p>Handles precision and scale constraints according to Flink's 
DecimalType limits:
+     *
+     * <ul>
+     *   <li>Precision: 1-38 (inclusive)
+     *   <li>Scale: 0 to precision (inclusive)
+     *   <li>Default precision: 10, default scale: 0
+     * </ul>
+     */
+    public static final class DecimalMapper implements CassandraFieldMapper {
+        private final DecimalType decimalType;
+
+        public DecimalMapper(DecimalType decimalType) {
+            this.decimalType = decimalType;
+        }
+
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            if (row.isNull(fieldName)) {
+                return null;
+            }
+            return convertValue(row.getDecimal(fieldName));
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            BigDecimal decimal = (BigDecimal) value;
+            return DecimalData.fromBigDecimal(
+                    decimal, decimalType.getPrecision(), 
decimalType.getScale());
+        }
+    }
+
+    /** Date field mapper that handles Cassandra LocalDate to Flink internal 
date format. */
+    public static final class DateMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            if (row.isNull(fieldName)) {
+                return null;
+            }
+            return convertValue(row.getDate(fieldName));
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            LocalDate date = (LocalDate) value;
+            return (int)
+                    java.time.LocalDate.of(date.getYear(), date.getMonth(), 
date.getDay())
+                            .toEpochDay();
+        }
+    }
+
+    /** Time field mapper that handles Cassandra time to Flink TIME 
conversion. */
+    public static final class TimeMapper implements CassandraFieldMapper {
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            if (row.isNull(fieldName)) {
+                return null;
+            }
+            return convertValue(row.getTime(fieldName));
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            // Cassandra time is nanoseconds since midnight (long)
+            // Flink TIME is milliseconds since midnight (int)
+            if (value instanceof Long) {
+                long nanoseconds = (Long) value;
+                return (int) (nanoseconds / 1_000_000); // Convert nanoseconds 
to milliseconds

Review Comment:
   org.apache.flink.table.types.logical.TimeType can handle nanoseconds though 
a long that would allow to not loose precision



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -98,16 +143,56 @@ public class CassandraSource<OUT>
     private static final long serialVersionUID = 1L;
 
     private final ClusterBuilder clusterBuilder;
-    private final Class<OUT> pojoClass;
     private final String query;
     private final String keyspace;
     private final String table;
-    private final MapperOptions mapperOptions;
-
+    private final CassandraRowToTypeConverter<OUT> rowToTypeConverter;
     private final long maxSplitMemorySize;
-    private static final long MIN_SPLIT_MEMORY_SIZE = 
MemorySize.ofMebiBytes(10).getBytes();
+    static final long MIN_SPLIT_MEMORY_SIZE = 
MemorySize.ofMebiBytes(10).getBytes();
     static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = 
MemorySize.ofMebiBytes(64).getBytes();
 
+    /**
+     * Creates a new builder for configuring a CassandraSource.
+     *
+     * @return a new CassandraSourceBuilder instance
+     */
+    public static CassandraSourceBuilder builder() {
+        return new CassandraSourceBuilder();
+    }
+
+    CassandraSource(

Review Comment:
   remove unused constructor



##########
flink-connector-cassandra/archunit-violations/fdc0cb7f-d8e1-4703-bc93-8745dbe93bd6:
##########
@@ -0,0 +1,2 @@
+org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.setClusterBuilder(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder):
 Argument leaf type 
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated

Review Comment:
   annotating ClusterBuilder "@PublicEvolving" seems good to me and remove the 
archunit violation



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setClusterBuilder(ClusterBuilder 
clusterBuilder) {
+        this.clusterBuilder = clusterBuilder;
+        return this;
+    }
+
+    /**
+     * Sets the CQL query to execute.
+     *
+     * <p>Query must be a simple SELECT statement without aggregations, ORDER 
BY, or GROUP BY
+     * clauses, as these operations are not supported when the query is 
executed on data partitions.
+     *
+     * @param query the CQL query string
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setQuery(String query) {
+        this.query = query;
+        return this;
+    }
+
+    /**
+     * Sets the maximum memory size for each split. Larger tables will be 
divided into multiple
+     * splits based on this size.
+     *
+     * <p>Default: 64MB, Minimum: 10MB
+     *
+     * @param maxSplitMemorySize the maximum memory size per split
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMaxSplitMemorySize(MemorySize 
maxSplitMemorySize) {
+        this.maxSplitMemorySize = maxSplitMemorySize.getBytes();
+        return this;
+    }
+
+    /**
+     * Sets mapper options for POJO mapping configuration.
+     *
+     * <p>Optional. If not set, default mapper options will be used.
+     *
+     * @param mapperOptions the mapper options for DataStax object mapper
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMapperOptions(MapperOptions 
mapperOptions) {
+        this.mapperOptions = mapperOptions;
+        return this;
+    }
+
+    /**
+     * Builds a CassandraSource configured for POJO output using DataStax 
object mapper.
+     *
+     * <p>The POJO class must be annotated with DataStax mapping annotations 
(e.g., {@code @Table},
+     * {@code @Column}).
+     *
+     * @param <T> the POJO type
+     * @param pojoClass the POJO class to map rows to
+     * @return the configured CassandraSource instance
+     * @throws IllegalStateException if required parameters are missing or 
invalid
+     */
+    public <T> CassandraSource<T> forPojo(Class<T> pojoClass) {
+        validateCommonParameters();
+        Objects.requireNonNull(pojoClass, "POJO class is required");

Review Comment:
   use flink preconditions



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -36,54 +36,99 @@
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
 import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
 import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
 import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
+import com.datastax.driver.mapping.MappingManager;
+
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A bounded source to read from Cassandra and return a collection of entities 
as {@code
- * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
- * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
- * in <a
- * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
- * Cassandra object mapper</a>).
+ * A bounded source to read from Cassandra and return data as a {@code 
DataStream<OUT>} where OUT
+ * can be any type determined by the provided converter. This source uses a 
pluggable converter
+ * interface that supports both {@link org.apache.flink.table.api.Table} and 
{@link
+ * org.apache.flink.streaming.api.datastream.DataStream}:
+ *
+ * <ul>
+ *   <li><b>DataStream API with POJOs</b>: Uses {@link 
CassandraRowToPojoConverter} with Cassandra's
+ *       {@link MappingManager} to convert cassandra's {@link 
com.datastax.driver.core.Row} to
+ *       annotated POJOs
+ *   <li><b>Table/SQL API</b>: Uses {@link CassandraRowToRowDataConverter} 
with field-based mapping
+ *       to convert rows to {@link org.apache.flink.table.data.RowData}
+ * </ul>
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>1. DataStream API with POJO (Object Mapper):</h4>
  *
- * <p>To use it, do the following:
+ * <p>For DataStream applications using POJOs with Cassandra annotations:
  *
  * <pre>{@code
  * ClusterBuilder clusterBuilder = new ClusterBuilder() {
  *   @Override
  *   protected Cluster buildCluster(Cluster.Builder builder) {
- *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *     return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, 
PORT))
  *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
- *                   .withSocketOptions(new SocketOptions()
- *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
- *                   .setReadTimeoutMillis(READ_TIMEOUT))
  *                   .build();
  *   }
  * };
- * long maxSplitMemorySize = ... //optional max split size in bytes minimum is 
10MB. If not set, maxSplitMemorySize = 64 MB
- * Source cassandraSource = new CassandraSource(clusterBuilder,
- *                                              maxSplitMemorySize,
- *                                              Pojo.class,
- *                                              "select ... from 
KEYSPACE.TABLE ...;",
- *                                              () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
- *
- * DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
- * "CassandraSource");
+ *
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)})
+ *     .buildForPojo(MyPojo.class);
+ *
+ * DataStream<MyPojo> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "CassandraSource");
+ * }</pre>
+ *
+ * <h4>2. Table/SQL API with RowData (Field Mapping):</h4>
+ *
+ * <p>For Table API and SQL applications, users create tables via SQL DDL. The 
factory automatically
+ * creates and configures the CassandraSource internally:
+ *
+ * <pre>{@code
+ * // Create Cassandra table using SQL DDL with custom ClusterBuilder
+ * tableEnv.executeSql(
+ *     "CREATE TABLE users (" +
+ *     "  id INT," +
+ *     "  name STRING," +
+ *     "  age INT," +
+ *     "  address ROW<street STRING, city STRING, zipcode INT>" +
+ *     ") WITH (" +
+ *     "  'connector' = 'cassandra'," +
+ *     "  'hosts' = 'localhost:9042'," +
+ *     "  'keyspace' = 'my_keyspace'," +
+ *     "  'table' = 'users'," +
+ *     "  'cluster-builder-class' = 'com.example.MyCustomClusterBuilder'" +
+ *     ")"
+ * );
+ *
+ * // Query using SQL - CassandraSource is created automatically by the factory
+ * Table result = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 
25");
  * }</pre>
  *
- * <p>Regarding performances, the source splits table data like this: 
numSplits =
- * tableSize/maxSplitMemorySize. If tableSize cannot be determined or previous 
numSplits computation

Review Comment:
   leave "or numSplits computation makes too few splits" it is important. It is 
for the case where the user specifies a too high split size that would like to 
1 split. Then fallback to parallelism splits.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -36,54 +36,99 @@
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
 import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
 import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
 import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
+import com.datastax.driver.mapping.MappingManager;
+
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A bounded source to read from Cassandra and return a collection of entities 
as {@code
- * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
- * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
- * in <a
- * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
- * Cassandra object mapper</a>).
+ * A bounded source to read from Cassandra and return data as a {@code 
DataStream<OUT>} where OUT
+ * can be any type determined by the provided converter. This source uses a 
pluggable converter
+ * interface that supports both {@link org.apache.flink.table.api.Table} and 
{@link
+ * org.apache.flink.streaming.api.datastream.DataStream}:
+ *
+ * <ul>
+ *   <li><b>DataStream API with POJOs</b>: Uses {@link 
CassandraRowToPojoConverter} with Cassandra's
+ *       {@link MappingManager} to convert cassandra's {@link 
com.datastax.driver.core.Row} to
+ *       annotated POJOs
+ *   <li><b>Table/SQL API</b>: Uses {@link CassandraRowToRowDataConverter} 
with field-based mapping
+ *       to convert rows to {@link org.apache.flink.table.data.RowData}
+ * </ul>
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>1. DataStream API with POJO (Object Mapper):</h4>
  *
- * <p>To use it, do the following:
+ * <p>For DataStream applications using POJOs with Cassandra annotations:
  *
  * <pre>{@code
  * ClusterBuilder clusterBuilder = new ClusterBuilder() {
  *   @Override
  *   protected Cluster buildCluster(Cluster.Builder builder) {
- *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *     return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, 
PORT))
  *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
- *                   .withSocketOptions(new SocketOptions()
- *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
- *                   .setReadTimeoutMillis(READ_TIMEOUT))
  *                   .build();
  *   }
  * };
- * long maxSplitMemorySize = ... //optional max split size in bytes minimum is 
10MB. If not set, maxSplitMemorySize = 64 MB
- * Source cassandraSource = new CassandraSource(clusterBuilder,
- *                                              maxSplitMemorySize,
- *                                              Pojo.class,
- *                                              "select ... from 
KEYSPACE.TABLE ...;",
- *                                              () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
- *
- * DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
- * "CassandraSource");
+ *
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)})
+ *     .buildForPojo(MyPojo.class);
+ *
+ * DataStream<MyPojo> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "CassandraSource");
+ * }</pre>
+ *
+ * <h4>2. Table/SQL API with RowData (Field Mapping):</h4>
+ *
+ * <p>For Table API and SQL applications, users create tables via SQL DDL. The 
factory automatically
+ * creates and configures the CassandraSource internally:
+ *
+ * <pre>{@code
+ * // Create Cassandra table using SQL DDL with custom ClusterBuilder
+ * tableEnv.executeSql(
+ *     "CREATE TABLE users (" +
+ *     "  id INT," +
+ *     "  name STRING," +
+ *     "  age INT," +
+ *     "  address ROW<street STRING, city STRING, zipcode INT>" +
+ *     ") WITH (" +
+ *     "  'connector' = 'cassandra'," +
+ *     "  'hosts' = 'localhost:9042'," +
+ *     "  'keyspace' = 'my_keyspace'," +
+ *     "  'table' = 'users'," +
+ *     "  'cluster-builder-class' = 'com.example.MyCustomClusterBuilder'" +

Review Comment:
   Please document config options (precedence, defaults etc) with
   - clusterbuilder vs host/port 
   - query vs keyspace/table



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -36,54 +36,99 @@
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
 import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
 import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
 import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
+import com.datastax.driver.mapping.MappingManager;
+
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A bounded source to read from Cassandra and return a collection of entities 
as {@code
- * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
- * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
- * in <a
- * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
- * Cassandra object mapper</a>).
+ * A bounded source to read from Cassandra and return data as a {@code 
DataStream<OUT>} where OUT
+ * can be any type determined by the provided converter. This source uses a 
pluggable converter
+ * interface that supports both {@link org.apache.flink.table.api.Table} and 
{@link
+ * org.apache.flink.streaming.api.datastream.DataStream}:
+ *
+ * <ul>
+ *   <li><b>DataStream API with POJOs</b>: Uses {@link 
CassandraRowToPojoConverter} with Cassandra's
+ *       {@link MappingManager} to convert cassandra's {@link 
com.datastax.driver.core.Row} to
+ *       annotated POJOs
+ *   <li><b>Table/SQL API</b>: Uses {@link CassandraRowToRowDataConverter} 
with field-based mapping
+ *       to convert rows to {@link org.apache.flink.table.data.RowData}
+ * </ul>
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>1. DataStream API with POJO (Object Mapper):</h4>
  *
- * <p>To use it, do the following:
+ * <p>For DataStream applications using POJOs with Cassandra annotations:
  *
  * <pre>{@code
  * ClusterBuilder clusterBuilder = new ClusterBuilder() {
  *   @Override
  *   protected Cluster buildCluster(Cluster.Builder builder) {
- *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *     return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, 
PORT))
  *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
- *                   .withSocketOptions(new SocketOptions()
- *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
- *                   .setReadTimeoutMillis(READ_TIMEOUT))
  *                   .build();
  *   }
  * };
- * long maxSplitMemorySize = ... //optional max split size in bytes minimum is 
10MB. If not set, maxSplitMemorySize = 64 MB
- * Source cassandraSource = new CassandraSource(clusterBuilder,
- *                                              maxSplitMemorySize,
- *                                              Pojo.class,
- *                                              "select ... from 
KEYSPACE.TABLE ...;",
- *                                              () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
- *
- * DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
- * "CassandraSource");
+ *
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)})
+ *     .buildForPojo(MyPojo.class);
+ *
+ * DataStream<MyPojo> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "CassandraSource");

Review Comment:
   nit picking: if you rename to MyPojo, also rename the Pojo class in the 
tests or leave Pojo



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setClusterBuilder(ClusterBuilder 
clusterBuilder) {
+        this.clusterBuilder = clusterBuilder;
+        return this;
+    }
+
+    /**
+     * Sets the CQL query to execute.
+     *
+     * <p>Query must be a simple SELECT statement without aggregations, ORDER 
BY, or GROUP BY
+     * clauses, as these operations are not supported when the query is 
executed on data partitions.
+     *
+     * @param query the CQL query string

Review Comment:
   refer the source doc for SQL limitations 



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -36,54 +36,99 @@
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
 import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
 import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
 import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
+import com.datastax.driver.mapping.MappingManager;
+
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A bounded source to read from Cassandra and return a collection of entities 
as {@code
- * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
- * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
- * in <a
- * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
- * Cassandra object mapper</a>).
+ * A bounded source to read from Cassandra and return data as a {@code 
DataStream<OUT>} where OUT
+ * can be any type determined by the provided converter. This source uses a 
pluggable converter
+ * interface that supports both {@link org.apache.flink.table.api.Table} and 
{@link
+ * org.apache.flink.streaming.api.datastream.DataStream}:
+ *
+ * <ul>
+ *   <li><b>DataStream API with POJOs</b>: Uses {@link 
CassandraRowToPojoConverter} with Cassandra's
+ *       {@link MappingManager} to convert cassandra's {@link 
com.datastax.driver.core.Row} to
+ *       annotated POJOs
+ *   <li><b>Table/SQL API</b>: Uses {@link CassandraRowToRowDataConverter} 
with field-based mapping
+ *       to convert rows to {@link org.apache.flink.table.data.RowData}
+ * </ul>
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>1. DataStream API with POJO (Object Mapper):</h4>
  *
- * <p>To use it, do the following:
+ * <p>For DataStream applications using POJOs with Cassandra annotations:
  *
  * <pre>{@code
  * ClusterBuilder clusterBuilder = new ClusterBuilder() {
  *   @Override
  *   protected Cluster buildCluster(Cluster.Builder builder) {
- *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *     return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, 
PORT))
  *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
- *                   .withSocketOptions(new SocketOptions()
- *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
- *                   .setReadTimeoutMillis(READ_TIMEOUT))

Review Comment:
   agree: this comes from datastax builder. Why document these particular items 
and not the others? Better to remove this doc.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})

Review Comment:
   consistency levels used to be configured in the clusterBuilder within the 
QueryOptions. Here it is only configured for mapping operations no (pojo API 
case) ?



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setClusterBuilder(ClusterBuilder 
clusterBuilder) {
+        this.clusterBuilder = clusterBuilder;
+        return this;
+    }
+
+    /**
+     * Sets the CQL query to execute.
+     *
+     * <p>Query must be a simple SELECT statement without aggregations, ORDER 
BY, or GROUP BY
+     * clauses, as these operations are not supported when the query is 
executed on data partitions.
+     *
+     * @param query the CQL query string
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setQuery(String query) {
+        this.query = query;
+        return this;
+    }
+
+    /**
+     * Sets the maximum memory size for each split. Larger tables will be 
divided into multiple
+     * splits based on this size.
+     *
+     * <p>Default: 64MB, Minimum: 10MB
+     *
+     * @param maxSplitMemorySize the maximum memory size per split
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMaxSplitMemorySize(MemorySize 
maxSplitMemorySize) {
+        this.maxSplitMemorySize = maxSplitMemorySize.getBytes();
+        return this;
+    }
+
+    /**
+     * Sets mapper options for POJO mapping configuration.
+     *
+     * <p>Optional. If not set, default mapper options will be used.
+     *
+     * @param mapperOptions the mapper options for DataStax object mapper
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMapperOptions(MapperOptions 
mapperOptions) {
+        this.mapperOptions = mapperOptions;
+        return this;
+    }
+
+    /**
+     * Builds a CassandraSource configured for POJO output using DataStax 
object mapper.
+     *
+     * <p>The POJO class must be annotated with DataStax mapping annotations 
(e.g., {@code @Table},
+     * {@code @Column}).
+     *
+     * @param <T> the POJO type
+     * @param pojoClass the POJO class to map rows to
+     * @return the configured CassandraSource instance
+     * @throws IllegalStateException if required parameters are missing or 
invalid
+     */
+    public <T> CassandraSource<T> forPojo(Class<T> pojoClass) {
+        validateCommonParameters();
+        Objects.requireNonNull(pojoClass, "POJO class is required");
+
+        MapperOptions options =
+                mapperOptions != null
+                        ? mapperOptions
+                        : () -> new 
com.datastax.driver.mapping.Mapper.Option[0];
+
+        CassandraRowToPojoConverter<T> converter =
+                new CassandraRowToPojoConverter<>(pojoClass, options, 
clusterBuilder);
+
+        return new CassandraSource<>(clusterBuilder, maxSplitMemorySize, 
converter, query);
+    }
+
+    /**
+     * Builds a CassandraSource configured for RowData output for Table API.

Review Comment:
   `{@link CassandraSource}`



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRowEmitter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Unified record emitter that uses the strategy pattern for deserialization.
+ *
+ * @param <OUT> The output type
+ */
+@Internal

Review Comment:
   this class is package local in 
org.apache.flink.connector.cassandra.source.reader so not visible to the user. 
No need to annotate it Internal (it is only for public classes that are user 
visible by construction but that should not be used by the user)



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java:
##########
@@ -41,6 +41,7 @@
  *
  * @param <OUT> type of POJO record to output
  */
+@Deprecated

Review Comment:
   same here: CassandraRecordEmitter is an internal class, no need for 
deprecation. It is replaced by CassandraRowEmitter, it is no more used, you can 
safely remove it.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CassandraFieldMapperFactory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Factory for creating appropriate field mappers based on Flink logical 
types. */
+@Internal
+public final class CassandraFieldMapperFactory {
+
+    private CassandraFieldMapperFactory() {}
+
+    /**
+     * Creates a field mapper for the given logical type. Different mappers 
for different {@link
+     * com.datastax.driver.core.DataType}
+     *
+     * @param logicalType the Flink logical type
+     * @return appropriate field mapper for the type
+     */
+    public static CassandraFieldMapper createFieldMapper(LogicalType 
logicalType) {
+        LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
+
+        switch (typeRoot) {
+                // Supported primitive types
+            case BOOLEAN:
+                return new PrimitiveFieldMappers.BooleanMapper();
+            case TINYINT:
+                return new PrimitiveFieldMappers.ByteMapper();
+            case SMALLINT:
+                return new PrimitiveFieldMappers.ShortMapper();
+            case INTEGER:
+                return new PrimitiveFieldMappers.IntegerMapper();
+            case BIGINT:
+                return new PrimitiveFieldMappers.LongMapper();
+            case FLOAT:
+                return new PrimitiveFieldMappers.FloatMapper();
+            case DOUBLE:
+                return new PrimitiveFieldMappers.DoubleMapper();
+            case VARCHAR:
+            case CHAR:
+                return new PrimitiveFieldMappers.StringMapper();
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) logicalType;
+                return new 
PrimitiveFieldMappers.DynamicDecimalMapper(decimalType);
+            case DATE:
+                return new PrimitiveFieldMappers.DateMapper();
+            case TIME_WITHOUT_TIME_ZONE:
+                return new PrimitiveFieldMappers.TimeMapper();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new PrimitiveFieldMappers.TimestampMapper();
+            case BINARY:
+            case VARBINARY:
+                return new PrimitiveFieldMappers.BinaryMapper();
+
+                // Supported collection types
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) logicalType;
+                CassandraFieldMapper elementMapper = 
createFieldMapper(arrayType.getElementType());
+                return new CollectionFieldMappers.ArrayMapper(elementMapper);
+            case MAP:
+                MapType mapType = (MapType) logicalType;
+                CassandraFieldMapper keyMapper = 
createFieldMapper(mapType.getKeyType());
+                CassandraFieldMapper valueMapper = 
createFieldMapper(mapType.getValueType());
+                return new CollectionFieldMappers.MapMapper(keyMapper, 
valueMapper);
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) logicalType;
+                CassandraFieldMapper setElementMapper =
+                        createFieldMapper(multisetType.getElementType());
+                return new CollectionFieldMappers.SetMapper(setElementMapper);
+            case ROW:
+                RowType rowType = (RowType) logicalType;
+                CassandraFieldMapper[] fieldMappers =
+                        new CassandraFieldMapper[rowType.getFieldCount()];
+                String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+
+                for (int i = 0; i < rowType.getFieldCount(); i++) {
+                    fieldMappers[i] = createFieldMapper(rowType.getTypeAt(i));
+                }
+                return new CollectionFieldMappers.RowMapper(fieldMappers, 
fieldNames);
+
+                // Timezone-aware types - not supported by Cassandra
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException(

Review Comment:
   use ValidationException as in other connectors. In general in your code use 
it for type mapping exceptions
   



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)

Review Comment:
   document how to create the cluster builder as it was done in the source doc



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSourceBuilder.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToPojoConverter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToRowDataConverter;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Builder for {@link CassandraSource} that provides a fluent API for 
configuring Cassandra source
+ * parameters.
+ *
+ * <h3>Usage Examples:</h3>
+ *
+ * <h4>POJO Source (DataStream API):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<MyPojo> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .setMapperOptions(() -> new Mapper.Option[] 
{Mapper.Option.consistencyLevel(ANY)})
+ *     .setMaxSplitMemorySize(MemorySize.ofMebiBytes(32))
+ *     .forPojo(MyPojo.class);
+ * }</pre>
+ *
+ * <h4>RowData Source (Table API - typically used internally):</h4>
+ *
+ * <pre>{@code
+ * CassandraSource<RowData> source = CassandraSource.builder()
+ *     .setClusterBuilder(clusterBuilder)
+ *     .setQuery("SELECT * FROM my_keyspace.my_table")
+ *     .forRowData(rowType);
+ * }</pre>
+ */
+@PublicEvolving
+public final class CassandraSourceBuilder {
+
+    private ClusterBuilder clusterBuilder;
+    private String query;
+    private long maxSplitMemorySize = 
CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT;
+    private MapperOptions mapperOptions;
+
+    CassandraSourceBuilder() {}
+
+    /**
+     * Sets the cluster builder for connecting to Cassandra cluster.
+     *
+     * @param clusterBuilder the cluster builder configuration
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setClusterBuilder(ClusterBuilder 
clusterBuilder) {
+        this.clusterBuilder = clusterBuilder;
+        return this;
+    }
+
+    /**
+     * Sets the CQL query to execute.
+     *
+     * <p>Query must be a simple SELECT statement without aggregations, ORDER 
BY, or GROUP BY
+     * clauses, as these operations are not supported when the query is 
executed on data partitions.
+     *
+     * @param query the CQL query string
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setQuery(String query) {
+        this.query = query;
+        return this;
+    }
+
+    /**
+     * Sets the maximum memory size for each split. Larger tables will be 
divided into multiple
+     * splits based on this size.
+     *
+     * <p>Default: 64MB, Minimum: 10MB
+     *
+     * @param maxSplitMemorySize the maximum memory size per split
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMaxSplitMemorySize(MemorySize 
maxSplitMemorySize) {
+        this.maxSplitMemorySize = maxSplitMemorySize.getBytes();
+        return this;
+    }
+
+    /**
+     * Sets mapper options for POJO mapping configuration.
+     *
+     * <p>Optional. If not set, default mapper options will be used.
+     *
+     * @param mapperOptions the mapper options for DataStax object mapper
+     * @return this builder instance
+     */
+    public CassandraSourceBuilder setMapperOptions(MapperOptions 
mapperOptions) {
+        this.mapperOptions = mapperOptions;
+        return this;
+    }
+
+    /**
+     * Builds a CassandraSource configured for POJO output using DataStax 
object mapper.
+     *
+     * <p>The POJO class must be annotated with DataStax mapping annotations 
(e.g., {@code @Table},
+     * {@code @Column}).
+     *
+     * @param <T> the POJO type
+     * @param pojoClass the POJO class to map rows to
+     * @return the configured CassandraSource instance
+     * @throws IllegalStateException if required parameters are missing or 
invalid
+     */
+    public <T> CassandraSource<T> forPojo(Class<T> pojoClass) {
+        validateCommonParameters();
+        Objects.requireNonNull(pojoClass, "POJO class is required");
+
+        MapperOptions options =
+                mapperOptions != null
+                        ? mapperOptions
+                        : () -> new 
com.datastax.driver.mapping.Mapper.Option[0];
+
+        CassandraRowToPojoConverter<T> converter =
+                new CassandraRowToPojoConverter<>(pojoClass, options, 
clusterBuilder);
+
+        return new CassandraSource<>(clusterBuilder, maxSplitMemorySize, 
converter, query);
+    }
+
+    /**
+     * Builds a CassandraSource configured for RowData output for Table API.
+     *
+     * <p>This method is typically used internally by the Table API factory.
+     *
+     * @param rowType the logical row type definition
+     * @return the configured CassandraSource instance
+     * @throws IllegalStateException if required parameters are missing or 
invalid
+     */
+    public CassandraSource<RowData> forRowData(RowType rowType) {
+        validateCommonParameters();
+        Objects.requireNonNull(rowType, "RowType is required");
+
+        CassandraRowToRowDataConverter converter = new 
CassandraRowToRowDataConverter(rowType);
+
+        return new CassandraSource<>(clusterBuilder, maxSplitMemorySize, 
converter, query);
+    }
+
+    private void validateCommonParameters() {
+        Objects.requireNonNull(clusterBuilder, "ClusterBuilder is required");

Review Comment:
   use flink preconditions and in all other places where you use Objects 
preconditions



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CassandraFieldMapper.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+
+import com.datastax.driver.core.Row;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for all Cassandra field mappers that convert values to 
Flink's internal {@link
+ * org.apache.flink.table.types.logical.LogicalTypeRoot}.
+ */
+@Internal
+public interface CassandraFieldMapper extends Serializable {
+
+    /**
+     * Extract a field value from a Cassandra Row and convert it to Flink 
internal format.
+     *
+     * <p>This method is used when reading from the Cassandra {@link Row} 
object returned by a
+     * query. It handles null checking and uses Cassandra's type-specific 
getters (getString,
+     * getInt, etc.) to safely extract the field value before converting it to 
Flink's internal
+     * representation.
+     *
+     * @param row the Cassandra {@link Row} object containing the field
+     * @param fieldName the name of the field to extract
+     * @return the field value converted to Flink internal format, or null if 
the field is null
+     */
+    Object extractFromRow(Row row, String fieldName);
+
+    /**
+     * Convert a raw value to Flink internal format.
+     *
+     * <p>This method is used when we already have an extracted value (not 
from a Row) that needs to
+     * be converted to Flink's internal format. Common scenarios include:
+     *
+     * <ul>
+     *   <li>Array elements: converting individual items from a List to Flink 
format
+     *   <li>Map values: converting keys/values from a Map to Flink format
+     *   <li>UDT fields: converting individual fields extracted from a UDTValue
+     * </ul>
+     *
+     * <p>Default implementation returns the value as-is (no conversion needed 
for primitive types).
+     * Override this method when conversion to Flink internal format is 
required.
+     *
+     * @param value the raw value to convert (String, Integer, UDTValue, etc.)
+     * @return the value converted to Flink internal format, or null if the 
input value is null
+     */
+    default Object convertValue(Object value) {

Review Comment:
   Some changes:
   - annotate with Nullable for static analysis
   - I'd rename to convertValueToFlinkType for clarity.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRowEmitter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import 
org.apache.flink.connector.cassandra.source.reader.converter.CassandraRowToTypeConverter;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Unified record emitter that uses the strategy pattern for deserialization.
+ *
+ * @param <OUT> The output type
+ */
+@Internal
+class CassandraRowEmitter<OUT> implements RecordEmitter<CassandraRow, OUT, 
CassandraSplit> {
+
+    private final CassandraRowToTypeConverter<OUT> converter;
+
+    public CassandraRowEmitter(CassandraRowToTypeConverter<OUT> converter) {
+        this.converter = requireNonNull(converter, "Converter cannot be null");

Review Comment:
   same here: use Flink utils.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/PrimitiveFieldMappers.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import com.datastax.driver.core.Duration;
+import com.datastax.driver.core.LocalDate;
+import com.datastax.driver.core.Row;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Date;
+
+/** Collection of primitive type field mappers for Cassandra to Flink 
conversion. */
+@Internal
+public final class PrimitiveFieldMappers {

Review Comment:
   This class is used through GenericRowData internal fields. But the doc of 
that class says "Note: All fields of this data structure must be internal data 
structures. See RowData for more information about internal data structures." 
And in this mapper direct primitive types (int, long, byte[], ...) are returned 
and directly assigned in RowToRowDataMapper without wrapping in RowData



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CassandraFieldMapper.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+
+import com.datastax.driver.core.Row;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for all Cassandra field mappers that convert values to 
Flink's internal {@link
+ * org.apache.flink.table.types.logical.LogicalTypeRoot}.
+ */
+@Internal
+public interface CassandraFieldMapper extends Serializable {
+
+    /**
+     * Extract a field value from a Cassandra Row and convert it to Flink 
internal format.
+     *
+     * <p>This method is used when reading from the Cassandra {@link Row} 
object returned by a
+     * query. It handles null checking and uses Cassandra's type-specific 
getters (getString,
+     * getInt, etc.) to safely extract the field value before converting it to 
Flink's internal
+     * representation.
+     *
+     * @param row the Cassandra {@link Row} object containing the field
+     * @param fieldName the name of the field to extract
+     * @return the field value converted to Flink internal format, or null if 
the field is null
+     */
+    Object extractFromRow(Row row, String fieldName);
+
+    /**
+     * Convert a raw value to Flink internal format.
+     *
+     * <p>This method is used when we already have an extracted value (not 
from a Row) that needs to

Review Comment:
   "(not from a Row)" is misleading as this method is called in many cases from 
an extracted field of a row. Maybe rephrase to "when we already have a raw 
value that needs to be converted to Flink's internal format"



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/table/mapper/CollectionFieldMappers.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.table.mapper;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TupleValue;
+import com.datastax.driver.core.UDTValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Collection field mappers for handling ARRAY and MAP types. */
+@Internal
+public final class CollectionFieldMappers {
+
+    private CollectionFieldMappers() {}
+
+    /** Array field mapper that delegates element mapping to provided element 
mapper. */
+    public static final class ArrayMapper implements CassandraFieldMapper {
+        private final CassandraFieldMapper fieldMapper;
+
+        public ArrayMapper(CassandraFieldMapper fieldMapper) {
+            this.fieldMapper = fieldMapper;
+        }
+
+        @Override
+        public Object extractFromRow(Row row, String fieldName) {
+            if (row.isNull(fieldName)) {
+                return null;
+            }
+            Object rawValue = row.getObject(fieldName);
+            return convertValue(rawValue);
+        }
+
+        @Override
+        public Object convertValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+
+            if (!(value instanceof List)) {
+                throw new IllegalArgumentException("Expected List, got: " + 
value.getClass());

Review Comment:
   same here and in all exceptions in type mapping system



##########
flink-connector-cassandra/archunit-violations/fdc0cb7f-d8e1-4703-bc93-8745dbe93bd6:
##########
@@ -0,0 +1,2 @@
+org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.setClusterBuilder(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder):
 Argument leaf type 
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated

Review Comment:
   same for MapperOptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to