This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 72e3bef [FLINK-26822] Add Cassandra Source
72e3bef is described below
commit 72e3bef1fb9ee6042955b5e9871a9f70a8837cca
Author: Etienne Chauchot <[email protected]>
AuthorDate: Wed Mar 22 09:38:17 2023 +0100
[FLINK-26822] Add Cassandra Source
---
flink-connector-cassandra/pom.xml | 7 +
.../cassandra/source/CassandraSource.java | 203 +++++++++++++++++
.../enumerator/CassandraEnumeratorState.java | 167 ++++++++++++++
.../CassandraEnumeratorStateSerializer.java | 106 +++++++++
.../enumerator/CassandraSplitEnumerator.java | 148 +++++++++++++
.../source/reader/CassandraRecordEmitter.java | 138 ++++++++++++
.../cassandra/source/reader/CassandraRow.java | 46 ++++
.../source/reader/CassandraSourceReader.java | 104 +++++++++
.../reader/CassandraSourceReaderFactory.java | 55 +++++
.../source/reader/CassandraSplitReader.java | 205 +++++++++++++++++
.../cassandra/source/split/CassandraSplit.java | 75 +++++++
.../source/split/CassandraSplitSerializer.java | 63 ++++++
.../cassandra/source/split/SplitsGenerator.java | 242 +++++++++++++++++++++
.../source/utils/BigIntegerSerializationUtils.java | 40 ++++
.../cassandra/example/BatchPojoExample.java | 1 +
.../cassandra/source/CassandraSourceITCase.java | 234 ++++++++++++++++++++
.../cassandra/source/CassandraTestContext.java | 161 ++++++++++++++
.../cassandra/source/CassandraTestEnvironment.java | 196 +++++++++++++++++
.../CassandraEnumeratorStateSerializerTest.java | 58 +++++
.../source/reader/CassandraQueryTest.java | 119 ++++++++++
.../source/split/CassandraSplitSerializerTest.java | 43 ++++
.../cassandra/utils}/Pojo.java | 26 ++-
tools/maven/suppressions.xml | 2 +-
23 files changed, 2437 insertions(+), 2 deletions(-)
diff --git a/flink-connector-cassandra/pom.xml
b/flink-connector-cassandra/pom.xml
index fa78a65..58d70b4 100644
--- a/flink-connector-cassandra/pom.xml
+++ b/flink-connector-cassandra/pom.xml
@@ -78,6 +78,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
new file mode 100644
index 0000000..dd45913
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.MemorySize;
+import
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
+import
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
+import
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bounded source to read from Cassandra and return a collection of entities
as {@code
+ * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
+ * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing
annotations (as described
+ * in <a
+ *
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/">
+ * Cassandra object mapper</a>).
+ *
+ * <p>To use it, do the following:
+ *
+ * <pre>{@code
+ * ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ * @Override
+ * protected Cluster buildCluster(Cluster.Builder builder) {
+ * return builder.addContactPointsWithPorts(new
InetSocketAddress(HOST,PORT))
+ * .withQueryOptions(new
QueryOptions().setConsistencyLevel(CL))
+ * .withSocketOptions(new SocketOptions()
+ * .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ * .setReadTimeoutMillis(READ_TIMEOUT))
+ * .build();
+ * }
+ * };
+ * long maxSplitMemorySize = ... //optional max split size in bytes. If not
set, maxSplitMemorySize = tableSize / parallelism
+ * Source cassandraSource = new CassandraSource(clusterBuilder,
+ * maxSplitMemorySize,
+ * Pojo.class,
+ * "select ... from
KEYSPACE.TABLE ...;",
+ * () -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)});
+ *
+ * DataStream<Pojo> stream = env.fromSource(cassandraSource,
WatermarkStrategy.noWatermarks(),
+ * "CassandraSource");
+ * }</pre>
+ */
+@PublicEvolving
+public class CassandraSource<OUT>
+ implements Source<OUT, CassandraSplit, CassandraEnumeratorState>,
ResultTypeQueryable<OUT> {
+
+ public static final Pattern CQL_PROHIBITED_CLAUSES_REGEXP =
+ Pattern.compile("(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*");
+ public static final Pattern SELECT_REGEXP =
+ Pattern.compile("(?i)select .+ from (\\w+)\\.(\\w+).*;$");
+
+ private static final long serialVersionUID = 1L;
+
+ private final ClusterBuilder clusterBuilder;
+ private final Class<OUT> pojoClass;
+ private final String query;
+ private final String keyspace;
+ private final String table;
+ private final MapperOptions mapperOptions;
+
+ private final long maxSplitMemorySize;
+ private static final long MIN_SPLIT_MEMORY_SIZE =
MemorySize.ofMebiBytes(10).getBytes();
+ static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT =
MemorySize.ofMebiBytes(64).getBytes();
+
+ public CassandraSource(
+ ClusterBuilder clusterBuilder,
+ Class<OUT> pojoClass,
+ String query,
+ MapperOptions mapperOptions) {
+ this(clusterBuilder, MAX_SPLIT_MEMORY_SIZE_DEFAULT, pojoClass, query,
mapperOptions);
+ }
+
+ public CassandraSource(
+ ClusterBuilder clusterBuilder,
+ long maxSplitMemorySize,
+ Class<OUT> pojoClass,
+ String query,
+ MapperOptions mapperOptions) {
+ checkNotNull(clusterBuilder, "ClusterBuilder required but not
provided");
+ checkNotNull(pojoClass, "POJO class required but not provided");
+ checkNotNull(query, "query required but not provided");
+ checkState(
+ maxSplitMemorySize >= MIN_SPLIT_MEMORY_SIZE,
+ "Defined maxSplitMemorySize (%s) is below minimum (%s)",
+ maxSplitMemorySize,
+ MIN_SPLIT_MEMORY_SIZE);
+ this.maxSplitMemorySize = maxSplitMemorySize;
+ final Matcher queryMatcher = checkQueryValidity(query);
+ this.query = query;
+ this.keyspace = queryMatcher.group(1);
+ this.table = queryMatcher.group(2);
+ this.clusterBuilder = clusterBuilder;
+ ClosureCleaner.clean(clusterBuilder,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ this.pojoClass = pojoClass;
+ this.mapperOptions = mapperOptions;
+ }
+
+ @VisibleForTesting
+ public static Matcher checkQueryValidity(String query) {
+ checkState(
+ !query.matches(CQL_PROHIBITED_CLAUSES_REGEXP.pattern()),
+ "Aggregations/OrderBy are not supported because the query is
executed on subsets/partitions of the input table");
+ final Matcher queryMatcher = SELECT_REGEXP.matcher(query);
+ checkState(
+ queryMatcher.matches(),
+ "Query must be of the form select ... from keyspace.table
...;");
+ return queryMatcher;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Internal
+ @Override
+ public SourceReader<OUT, CassandraSplit> createReader(SourceReaderContext
readerContext) {
+ return new CassandraSourceReaderFactory<OUT>()
+ .create(
+ readerContext,
+ clusterBuilder,
+ pojoClass,
+ query,
+ keyspace,
+ table,
+ mapperOptions);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<CassandraSplit, CassandraEnumeratorState>
createEnumerator(
+ SplitEnumeratorContext<CassandraSplit> enumContext) {
+ return new CassandraSplitEnumerator(
+ enumContext, null, clusterBuilder, maxSplitMemorySize,
keyspace, table);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<CassandraSplit, CassandraEnumeratorState>
restoreEnumerator(
+ SplitEnumeratorContext<CassandraSplit> enumContext,
+ CassandraEnumeratorState enumCheckpoint) {
+ return new CassandraSplitEnumerator(
+ enumContext, enumCheckpoint, clusterBuilder,
maxSplitMemorySize, keyspace, table);
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<CassandraSplit> getSplitSerializer() {
+ return CassandraSplitSerializer.INSTANCE;
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<CassandraEnumeratorState>
getEnumeratorCheckpointSerializer() {
+ return CassandraEnumeratorStateSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return TypeInformation.of(pojoClass);
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
new file mode 100644
index 0000000..c16fe13
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/**
+ * State for {@link CassandraSplitEnumerator}. It stores the offset ({@code
startToken}) of the last
+ * lazy {@link CassandraSplit} generation and the number of splits left to
generate. Upon
+ * restoration of this sate, {@link SplitsGenerator#prepareSplits()} is
obviously not re-run. So we
+ * need to store also the result of this initial splits preparation ({@code
increment} and {@code
+ * maxToken}).
+ */
+public class CassandraEnumeratorState {
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraEnumeratorState.class);
+ private long numSplitsLeftToGenerate;
+ private BigInteger increment;
+ private BigInteger startToken;
+ private BigInteger maxToken;
+ // splits that were assigned to a failed reader and that were not part of
a checkpoint, so after
+ // restoration, they need to be reassigned.
+ private final Queue<CassandraSplit> splitsToReassign;
+
+ CassandraEnumeratorState() {
+ this.splitsToReassign = new ArrayDeque<>();
+ }
+
+ public CassandraEnumeratorState(
+ long numSplitsLeftToGenerate,
+ BigInteger increment,
+ BigInteger startToken,
+ BigInteger maxToken,
+ Queue<CassandraSplit> splitsToReassign) {
+ this.numSplitsLeftToGenerate = numSplitsLeftToGenerate;
+ this.increment = increment;
+ this.startToken = startToken;
+ this.maxToken = maxToken;
+ this.splitsToReassign = splitsToReassign;
+ }
+
+ Queue<CassandraSplit> getSplitsToReassign() {
+ return splitsToReassign;
+ }
+
+ public long getNumSplitsLeftToGenerate() {
+ return numSplitsLeftToGenerate;
+ }
+
+ BigInteger getIncrement() {
+ return increment;
+ }
+
+ BigInteger getStartToken() {
+ return startToken;
+ }
+
+ BigInteger getMaxToken() {
+ return maxToken;
+ }
+
+ void addSplitsBack(Collection<CassandraSplit> splits) {
+ LOG.info(
+ "Add {} splits back to CassandraSplitEnumerator for
reassignment after failover",
+ splits.size());
+ splitsToReassign.addAll(splits);
+ }
+
+ /**
+ * Provide a {@link CassandraSplit} that was assigned to a failed reader
or lazily create one.
+ * Splits contain a range of the Cassandra ring of {@code
maxSplitMemorySize}. There is no way
+ * to estimate the size of the data with the optional SQL filters without
reading the data. So
+ * the split can be smaller than {@code maxSplitMemorySize} when the query
is actually executed.
+ */
+ public @Nullable CassandraSplit getNextSplit() {
+ // serve slits to reassign first
+ final CassandraSplit splitToReassign = splitsToReassign.poll();
+ if (splitToReassign != null) {
+ return splitToReassign;
+ } // else no more splits to reassign, generate one
+ if (numSplitsLeftToGenerate == 0) {
+ return null; // enumerator will send the no more split message to
the requesting reader
+ }
+ BigInteger endToken =
+ numSplitsLeftToGenerate == 1
+ // last split to generate, round up to the last token
of the ring
+ ? maxToken
+ : startToken.add(increment);
+ CassandraSplit split = new CassandraSplit(startToken, endToken);
+ // prepare for next call
+ this.startToken = endToken;
+ numSplitsLeftToGenerate--;
+ return split;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CassandraEnumeratorState that = (CassandraEnumeratorState) o;
+ if (this.splitsToReassign.size() != that.splitsToReassign.size()) {
+ return false;
+ }
+ for (CassandraSplit cassandraSplit : splitsToReassign) {
+ if (!that.splitsToReassign.contains(cassandraSplit)) {
+ return false;
+ }
+ }
+ return numSplitsLeftToGenerate == that.numSplitsLeftToGenerate
+ && increment.equals(that.increment)
+ && startToken.equals(that.startToken)
+ && maxToken.equals(that.maxToken);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ numSplitsLeftToGenerate, increment, startToken, maxToken,
splitsToReassign);
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraEnumeratorState{"
+ + "numSplitsLeftToGenerate="
+ + numSplitsLeftToGenerate
+ + ", increment="
+ + increment
+ + ", startToken="
+ + startToken
+ + ", maxToken="
+ + maxToken
+ + ", splitsToReassign="
+ + splitsToReassign
+ + '}';
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
new file mode 100644
index 0000000..3725d47
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import
org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** Serializer for {@link CassandraEnumeratorState}. */
+public class CassandraEnumeratorStateSerializer
+ implements SimpleVersionedSerializer<CassandraEnumeratorState> {
+
+ public static final CassandraEnumeratorStateSerializer INSTANCE =
+ new CassandraEnumeratorStateSerializer();
+ public static final int CURRENT_VERSION = 0;
+
+ private CassandraEnumeratorStateSerializer() {}
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState)
throws IOException {
+ try (final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ final ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(byteArrayOutputStream)) {
+ final Queue<CassandraSplit> splitsToReassign =
+ cassandraEnumeratorState.getSplitsToReassign();
+ objectOutputStream.writeInt(splitsToReassign.size());
+ for (CassandraSplit cassandraSplit : splitsToReassign) {
+ final byte[] serializedSplit =
+
CassandraSplitSerializer.INSTANCE.serialize(cassandraSplit);
+ objectOutputStream.writeInt(serializedSplit.length);
+ objectOutputStream.write(serializedSplit);
+ }
+
+
objectOutputStream.writeLong(cassandraEnumeratorState.getNumSplitsLeftToGenerate());
+ BigIntegerSerializationUtils.write(
+ cassandraEnumeratorState.getIncrement(),
objectOutputStream);
+ BigIntegerSerializationUtils.write(
+ cassandraEnumeratorState.getStartToken(),
objectOutputStream);
+ BigIntegerSerializationUtils.write(
+ cassandraEnumeratorState.getMaxToken(),
objectOutputStream);
+
+ objectOutputStream.flush();
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ @Override
+ public CassandraEnumeratorState deserialize(int version, byte[]
serialized) throws IOException {
+ try (final ByteArrayInputStream byteArrayInputStream =
+ new ByteArrayInputStream(serialized);
+ final ObjectInputStream objectInputStream =
+ new ObjectInputStream(byteArrayInputStream)) {
+ final Queue<CassandraSplit> splitsToReassign = new ArrayDeque<>();
+ final int splitsToReassignSize = objectInputStream.readInt();
+ for (int i = 0; i < splitsToReassignSize; i++) {
+ final int splitSize = objectInputStream.readInt();
+ final byte[] splitBytes = new byte[splitSize];
+ IOUtils.readFully(objectInputStream, splitBytes, 0, splitSize);
+ final CassandraSplit split =
+ CassandraSplitSerializer.INSTANCE.deserialize(
+ CassandraSplitSerializer.CURRENT_VERSION,
splitBytes);
+ splitsToReassign.add(split);
+ }
+
+ final long numSplitsLeftToGenerate = objectInputStream.readLong();
+ final BigInteger increment =
BigIntegerSerializationUtils.read(objectInputStream);
+ final BigInteger startToken =
BigIntegerSerializationUtils.read(objectInputStream);
+ final BigInteger maxToken =
BigIntegerSerializationUtils.read(objectInputStream);
+
+ return new CassandraEnumeratorState(
+ numSplitsLeftToGenerate, increment, startToken, maxToken,
splitsToReassign);
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
new file mode 100644
index 0000000..1337846
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+ implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<CassandraSplit> enumeratorContext;
+ private CassandraEnumeratorState state;
+ private final Cluster cluster;
+ private final Long maxSplitMemorySize;
+ private final Session session;
+ private final String keyspace;
+ private final String table;
+
+ public CassandraSplitEnumerator(
+ SplitEnumeratorContext<CassandraSplit> enumeratorContext,
+ CassandraEnumeratorState state,
+ ClusterBuilder clusterBuilder,
+ Long maxSplitMemorySize,
+ String keyspace,
+ String table) {
+ this.enumeratorContext = enumeratorContext;
+ this.state = state == null ? new CassandraEnumeratorState() : state /*
snapshot restore*/;
+ this.cluster = clusterBuilder.getCluster();
+ this.maxSplitMemorySize = maxSplitMemorySize;
+ this.session = cluster.newSession();
+ this.keyspace = keyspace;
+ this.table = table;
+ }
+
+ @Override
+ public void start() {
+ enumeratorContext.callAsync(
+ this::prepareSplits,
+ (preparedState, throwable) -> {
+ LOG.debug("Initialized CassandraEnumeratorState: {}",
preparedState.toString());
+ state = preparedState;
+ });
+ }
+
+ private CassandraEnumeratorState prepareSplits() {
+ final int parallelism = enumeratorContext.currentParallelism();
+ final String partitionerName = cluster.getMetadata().getPartitioner();
+ final SplitsGenerator.CassandraPartitioner partitioner =
+ partitionerName.contains(MURMUR3PARTITIONER.getClassName())
+ ? MURMUR3PARTITIONER
+ : RANDOMPARTITIONER;
+ final SplitsGenerator splitsGenerator =
+ new SplitsGenerator(
+ partitioner, session, keyspace, table, parallelism,
maxSplitMemorySize);
+ return splitsGenerator.prepareSplits();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ checkReaderRegistered(subtaskId);
+ final CassandraSplit cassandraSplit = state.getNextSplit();
+ if (cassandraSplit != null) {
+ LOG.info("Assigning splits to reader {}", subtaskId);
+ enumeratorContext.assignSplit(cassandraSplit, subtaskId);
+ } else {
+ LOG.info(
+ "No split assigned to reader {} because the enumerator has
no unassigned split left. Sending NoMoreSplitsEvent to reader",
+ subtaskId);
+ enumeratorContext.signalNoMoreSplits(subtaskId);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<CassandraSplit> splits, int subtaskId) {
+ // splits that were assigned to a failed reader and that were not part
of a checkpoint, so
+ // after restoration, they need to be reassigned
+ state.addSplitsBack(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // nothing to do on reader registration as the CassandraSplits are
generated lazily
+ }
+
+ private void checkReaderRegistered(int readerId) {
+ if (!enumeratorContext.registeredReaders().containsKey(readerId)) {
+ throw new IllegalStateException(
+ String.format("Reader %d is not registered to source
coordinator", readerId));
+ }
+ }
+
+ @Override
+ public CassandraEnumeratorState snapshotState(long checkpointId) {
+ return state;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
new file mode 100644
index 0000000..8287057
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ExecutionInfo;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * {@link RecordEmitter} that converts the {@link CassandraRow} read by the
{@link
+ * CassandraSplitReader} to specified POJO and output it. This class uses the
Cassandra driver
+ * mapper to map the row to the POJO.
+ *
+ * @param <OUT> type of POJO record to output
+ */
+class CassandraRecordEmitter<OUT> implements RecordEmitter<CassandraRow, OUT,
CassandraSplit> {
+
+ private final Function<ResultSet, OUT> map;
+
+ public CassandraRecordEmitter(Function<ResultSet, OUT> map) {
+ this.map = map;
+ }
+
+ @Override
+ public void emitRecord(
+ CassandraRow cassandraRow, SourceOutput<OUT> output,
CassandraSplit cassandraSplit) {
+ // Mapping from a row to a Class<OUT> is a complex operation involving
reflection API.
+ // It is better to use Cassandra mapper for it.
+ // But the mapper takes only a resultSet as input hence forging one
containing only the Row
+ ResultSet resultSet = new SingleRowResultSet(cassandraRow);
+ // output the pojo based on the cassandraRow
+ output.collect(map.apply(resultSet));
+ }
+
+ private static class SingleRowResultSet implements ResultSet {
+ private final CassandraRow cassandraRow;
+ private final Row row;
+
+ private SingleRowResultSet(CassandraRow cassandraRow) {
+ this.cassandraRow = cassandraRow;
+ this.row = cassandraRow.getRow();
+ }
+
+ @Override
+ public Row one() {
+ return row;
+ }
+
+ @Override
+ public ColumnDefinitions getColumnDefinitions() {
+ return row.getColumnDefinitions();
+ }
+
+ @Override
+ public boolean wasApplied() {
+ return true;
+ }
+
+ @Override
+ public boolean isExhausted() {
+ return true;
+ }
+
+ @Override
+ public boolean isFullyFetched() {
+ return true;
+ }
+
+ @Override
+ public int getAvailableWithoutFetching() {
+ return 1;
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> fetchMoreResults() {
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public List<Row> all() {
+ return Collections.singletonList(row);
+ }
+
+ @Override
+ public Iterator<Row> iterator() {
+ return new Iterator<Row>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Row next() {
+ return row;
+ }
+ };
+ }
+
+ @Override
+ public ExecutionInfo getExecutionInfo() {
+ return cassandraRow.getExecutionInfo();
+ }
+
+ @Override
+ public List<ExecutionInfo> getAllExecutionInfo() {
+ return Collections.singletonList(cassandraRow.getExecutionInfo());
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
new file mode 100644
index 0000000..11c078e
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import com.datastax.driver.core.ExecutionInfo;
+import com.datastax.driver.core.Row;
+
+/**
+ * Wrapper for Cassandra {@link Row} that stores {@link ExecutionInfo}
Cassandra statistics about
+ * the query execution that produced this row. {@link ExecutionInfo} is useful
for using the
+ * Cassandra mapper during row translation to pojo.
+ */
+public class CassandraRow {
+
+ private final Row row;
+ private final ExecutionInfo executionInfo;
+
+ public CassandraRow(Row row, ExecutionInfo executionInfo) {
+ this.row = row;
+ this.executionInfo = executionInfo;
+ }
+
+ public Row getRow() {
+ return row;
+ }
+
+ public ExecutionInfo getExecutionInfo() {
+ return executionInfo;
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
new file mode 100644
index 0000000..66eefcb
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Cassandra {@link SourceReader} that reads one {@link CassandraSplit} using
a single thread.
+ *
+ * @param <OUT> the type of elements produced by the source
+ */
+class CassandraSourceReader<OUT>
+ extends SingleThreadMultiplexSourceReaderBase<
+ CassandraRow, OUT, CassandraSplit, CassandraSplit> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraSourceReader.class);
+
+ private final Cluster cluster;
+ private final Session session;
+
+ // created by the factory
+ CassandraSourceReader(
+ SourceReaderContext context,
+ String query,
+ String keyspace,
+ String table,
+ Cluster cluster,
+ Session session,
+ Mapper<OUT> mapper) {
+ super(
+ () -> new CassandraSplitReader(cluster, session, query,
keyspace, table),
+ new CassandraRecordEmitter<>(resultSet ->
mapper.map(resultSet).one()),
+ context.getConfiguration(),
+ context);
+ this.cluster = cluster;
+ this.session = session;
+ }
+
+ @Override
+ public void start() {
+ context.sendSplitRequest();
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, CassandraSplit>
finishedSplitIds) {
+ context.sendSplitRequest();
+ }
+
+ @Override
+ protected CassandraSplit initializedState(CassandraSplit cassandraSplit) {
+ return cassandraSplit;
+ }
+
+ @Override
+ protected CassandraSplit toSplitType(String splitId, CassandraSplit
cassandraSplit) {
+ return cassandraSplit;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
new file mode 100644
index 0000000..3d06097
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+
+/**
+ * Factory to create {@link CassandraSourceReader}s and allow sharing the
cluster and the session
+ * objects.
+ */
+public class CassandraSourceReaderFactory<OUT> {
+ public CassandraSourceReader<OUT> create(
+ SourceReaderContext context,
+ ClusterBuilder clusterBuilder,
+ Class<OUT> pojoClass,
+ String query,
+ String keyspace,
+ String table,
+ MapperOptions mapperOptions) {
+ Cluster cluster = clusterBuilder.getCluster();
+ Session session = cluster.connect();
+ Mapper<OUT> mapper = new MappingManager(session).mapper(pojoClass);
+ if (mapperOptions != null) {
+ Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
+ if (optionsArray != null) {
+ mapper.setDefaultGetOptions(optionsArray);
+ }
+ }
+ return new CassandraSourceReader<>(
+ context, query, keyspace, table, cluster, session, mapper);
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
new file mode 100644
index 0000000..2344705
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for
fetching the records as
+ * {@link CassandraRow}s. For that, it executes a range query (query that
outputs records belonging
+ * to Cassandra token range) based on the user specified query.
+ */
+class CassandraSplitReader implements SplitReader<CassandraRow,
CassandraSplit> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraSplitReader.class);
+
+ private final Cluster cluster;
+ private final Session session;
+ private final Set<CassandraSplit> unprocessedSplits;
+ private final AtomicBoolean wakeup = new AtomicBoolean(false);
+ private final String query;
+ private final String keyspace;
+ private final String table;
+
+ public CassandraSplitReader(
+ Cluster cluster, Session session, String query, String keyspace,
String table) {
+ this.unprocessedSplits = new HashSet<>();
+ this.query = query;
+ this.keyspace = keyspace;
+ this.table = table;
+ this.cluster = cluster;
+ this.session = session;
+ }
+
+ @Override
+ public RecordsWithSplitIds<CassandraRow> fetch() {
+ Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>();
+ Set<String> finishedSplits = new HashSet<>();
+
+ Metadata clusterMetadata = cluster.getMetadata();
+ String partitionKey = getPartitionKey(clusterMetadata);
+ String finalQuery = generateRangeQuery(query, partitionKey);
+ PreparedStatement preparedStatement = session.prepare(finalQuery);
+
+ // Set wakeup to false to start consuming
+ wakeup.compareAndSet(true, false);
+ for (CassandraSplit cassandraSplit : unprocessedSplits) {
+ // allow to interrupt the reading of splits especially the
blocking session.execute()
+ // call as requested in the API
+ if (wakeup.get()) {
+ break;
+ }
+ try {
+ Token startToken =
+
clusterMetadata.newToken(cassandraSplit.getRingRangeStart().toString());
+ Token endToken =
+
clusterMetadata.newToken(cassandraSplit.getRingRangeEnd().toString());
+ final ResultSet resultSet =
+ session.execute(
+ preparedStatement
+ .bind()
+ .setToken(0, startToken)
+ .setToken(1, endToken));
+ // add all the records of the split to the output (in memory).
+ // It is safe because each split has a configurable maximum
memory size
+ addRecordsToOutput(resultSet, cassandraSplit, recordsBySplit);
+ // add the already read (or even empty) split to finished
splits
+ finishedSplits.add(cassandraSplit.splitId());
+ // for reentrant calls: if fetch is restarted,
+ // do not reprocess the already processed splits
+ unprocessedSplits.remove(cassandraSplit);
+ } catch (Exception ex) {
+ LOG.error("Error while reading split ", ex);
+ }
+ }
+ return new RecordsBySplits<>(recordsBySplit, finishedSplits);
+ }
+
+ private String getPartitionKey(Metadata clusterMetadata) {
+ return
clusterMetadata.getKeyspace(keyspace).getTable(table).getPartitionKey().stream()
+ .map(ColumnMetadata::getName)
+ .collect(Collectors.joining(","));
+ }
+
+ @Override
+ public void wakeUp() {
+ wakeup.compareAndSet(false, true);
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<CassandraSplit>
splitsChanges) {
+ unprocessedSplits.addAll(splitsChanges.splits());
+ }
+
+ /**
+ * Utility method to add the ring token filtering clauses to the user
query to generate the
+ * split query. For example:
+ *
+ * <ul>
+ * <li><code>"select * from
+ * keyspace.table where field1=value1;"</code> will be transformed
into <code>
+ * "select * from
+ * keyspace.table where (token(partitionKey) >= ?) AND
(token(partitionKey) < ?) AND
+ * field1=value1;"</code>
+ * <li><code>"select * from
+ * keyspace.table;"</code> will be transformed into <code>
+ * "select * from keyspace.table WHERE
+ * (token(%s) >= ?) AND (token(%s) < ?);"</code>
+ * </ul>
+ *
+ * @param query the user input query
+ * @param partitionKey Cassandra partition key of the user provided table
+ * @return the final split query that will be sent to the Cassandra cluster
+ */
+ @VisibleForTesting
+ static String generateRangeQuery(String query, String partitionKey) {
+ Matcher queryMatcher = CassandraSource.SELECT_REGEXP.matcher(query);
+ if (!queryMatcher.matches()) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to generate range query out of the
provided query: %s", query));
+ }
+ final int whereIndex = query.toLowerCase().indexOf("where");
+ int insertionPoint;
+ String filter;
+ if (whereIndex != -1) {
+ insertionPoint = whereIndex + "where".length();
+ filter =
+ String.format(
+ " (token(%s) >= ?) AND (token(%s) < ?) AND",
+ partitionKey, partitionKey);
+ } else {
+ // end of keyspace.table
+ insertionPoint = queryMatcher.end(2);
+ filter =
+ String.format(
+ " WHERE (token(%s) >= ?) AND (token(%s) < ?)",
+ partitionKey, partitionKey);
+ }
+ return String.format(
+ "%s%s%s",
+ query.substring(0, insertionPoint), filter,
query.substring(insertionPoint));
+ }
+
+ /**
+ * This method populates the {@code Map<String, Collection<CassandraRow>>
recordsBySplit} map
+ * that is used to create the {@link RecordsBySplits} that are output by
the fetch method. It
+ * modifies its {@code output} parameter.
+ */
+ private void addRecordsToOutput(
+ ResultSet resultSet,
+ CassandraSplit cassandraSplit,
+ Map<String, Collection<CassandraRow>> output) {
+ resultSet.forEach(
+ row ->
+ output.computeIfAbsent(cassandraSplit.splitId(), id ->
new ArrayList<>())
+ .add(new CassandraRow(row,
resultSet.getExecutionInfo())));
+ }
+
+ @Override
+ public void close() throws Exception {
+ // nothing to do as the cluster/session is managed by the SourceReader
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
new file mode 100644
index 0000000..556b87d
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+
+/**
+ * Immutable {@link SourceSplit} for Cassandra source. A Cassandra split is a
slice of the Cassandra
+ * tokens ring (i.e. a ringRange).
+ */
+public class CassandraSplit implements SourceSplit, Serializable {
+
+ private final BigInteger ringRangeStart;
+ private final BigInteger ringRangeEnd;
+
+ public CassandraSplit(BigInteger ringRangeStart, BigInteger ringRangeEnd) {
+ this.ringRangeStart = ringRangeStart;
+ this.ringRangeEnd = ringRangeEnd;
+ }
+
+ public BigInteger getRingRangeStart() {
+ return ringRangeStart;
+ }
+
+ public BigInteger getRingRangeEnd() {
+ return ringRangeEnd;
+ }
+
+ @Override
+ public String splitId() {
+ return String.format("(%s,%s)", ringRangeStart.toString(),
ringRangeEnd.toString());
+ }
+
+ @Override
+ public String toString() {
+ return splitId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CassandraSplit other = (CassandraSplit) o;
+ return ringRangeStart.equals(other.ringRangeStart)
+ && ringRangeEnd.equals(other.ringRangeEnd);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * ringRangeStart.hashCode() + ringRangeEnd.hashCode();
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
new file mode 100644
index 0000000..74fa573
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import
org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/** Serializer for {@link CassandraSplit}. */
+public class CassandraSplitSerializer implements
SimpleVersionedSerializer<CassandraSplit> {
+
+ public static final CassandraSplitSerializer INSTANCE = new
CassandraSplitSerializer();
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+ public static final int CURRENT_VERSION = 0;
+
+ private CassandraSplitSerializer() {}
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeStart(),
out);
+ BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeEnd(),
out);
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ @Override
+ public CassandraSplit deserialize(int version, byte[] serialized) throws
IOException {
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+ final BigInteger ringRangeStart =
BigIntegerSerializationUtils.read(in);
+ final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(in);
+ return new CassandraSplit(ringRangeStart, ringRangeEnd);
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
new file mode 100644
index 0000000..e44be34
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class prepares the generation of {@link CassandraSplit}s based on
Cassandra cluster
+ * partitioner and cluster statistics. It estimates the total size of the
table using Cassandra
+ * system table system.size_estimates.
+ */
+public final class SplitsGenerator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitsGenerator.class);
+
+ private final CassandraPartitioner partitioner;
+ private final Session session;
+ private final String keyspace;
+ private final String table;
+ private final int parallelism;
+ private final long maxSplitMemorySize;
+
+ public SplitsGenerator(
+ CassandraPartitioner partitioner,
+ Session session,
+ String keyspace,
+ String table,
+ int parallelism,
+ long maxSplitMemorySize) {
+ this.partitioner = partitioner;
+ this.session = session;
+ this.keyspace = keyspace;
+ this.table = table;
+ this.parallelism = parallelism;
+ this.maxSplitMemorySize = maxSplitMemorySize;
+ }
+
+ /**
+ * Prepare the {@param CassandraEnumeratorState} for lazy generation of
{@link CassandraSplit}s:
+ * calculate {@code numSplitsToGenerate} based on estimated target table
size and provided
+ * {@code maxSplitMemorySize} and calculate {@code increment} which is the
size of a split in
+ * tokens.
+ */
+ public CassandraEnumeratorState prepareSplits() {
+ final long numSplitsToGenerate = decideOnNumSplits();
+ final BigInteger increment =
+ (partitioner.ringSize).divide(new
BigInteger(String.valueOf(numSplitsToGenerate)));
+ final BigInteger startToken = partitioner.minToken;
+ return new CassandraEnumeratorState(
+ numSplitsToGenerate,
+ increment,
+ startToken,
+ partitioner.maxToken,
+ new ArrayDeque<>());
+ }
+
+ /**
+ * Determine {@code numSplits} based on the estimation of the target table
size and configured
+ * {@code maxSplitMemorySize}. Provide fallbacks when table size is
unavailable, too few splits
+ * are calculated.
+ */
+ private long decideOnNumSplits() {
+ long numSplits;
+ final long estimateTableSize = estimateTableSize();
+ if (estimateTableSize == 0) { // size estimates unavailable
+ LOG.info(
+ "Cassandra size estimates are not available for {}.{}
table. Creating as many splits as parallelism ({})",
+ keyspace,
+ table,
+ parallelism);
+ numSplits = parallelism;
+ } else { // create estimateTableSize / maxSplitMemorySize splits.
Otherwise, create
+ // parallelism splits if that makes too few splits.
+ LOG.debug(
+ "Estimated size for {}.{} table is {} bytes",
+ keyspace,
+ table,
+ estimateTableSize);
+ numSplits =
+ estimateTableSize / maxSplitMemorySize == 0
+ ? parallelism
+ : estimateTableSize / maxSplitMemorySize;
+ LOG.info(
+ "maxSplitMemorySize set value ({}) leads to the creation
of {} splits",
+ maxSplitMemorySize,
+ numSplits);
+ }
+ return numSplits;
+ }
+
+ /**
+ * Estimates the size of the table in bytes. Cassandra size estimates can
be 0 if the data was
+ * just inserted and the amount of data in the table was small. This is
very common situation
+ * during tests.
+ */
+ @VisibleForTesting
+ public long estimateTableSize() {
+ List<TokenRange> tokenRanges = getTokenRangesOfTable();
+ long size = 0L;
+ for (TokenRange tokenRange : tokenRanges) {
+ size += tokenRange.meanPartitionSize * tokenRange.partitionCount;
+ }
+ final float ringFraction = getRingFraction(tokenRanges);
+ // ringFraction can be null if the size estimates are not available
+ return ringFraction != 0 ? Math.round(size / ringFraction) : 0L;
+ }
+
+ /**
+ * The values that we get from system.size_estimates are for one node. We
need to extrapolate to
+ * the whole cluster. This method estimates the percentage, the node
represents in the cluster.
+ *
+ * @param tokenRanges The list of {@link TokenRange} to estimate
+ * @return The percentage the node represent in the whole cluster
+ */
+ private float getRingFraction(List<TokenRange> tokenRanges) {
+ BigInteger addressedTokens = BigInteger.ZERO;
+ for (TokenRange tokenRange : tokenRanges) {
+ addressedTokens =
+ addressedTokens.add(distance(tokenRange.rangeStart,
tokenRange.rangeEnd));
+ }
+ // it is < 1 because it is a percentage
+ return addressedTokens.divide(partitioner.ringSize).floatValue();
+ }
+
+ /** Gets the list of token ranges that the table occupies on a given
Cassandra node. */
+ private List<TokenRange> getTokenRangesOfTable() {
+ ResultSet resultSet =
+ session.execute(
+ "SELECT range_start, range_end, partitions_count,
mean_partition_size FROM "
+ + "system.size_estimates WHERE keyspace_name =
? AND table_name = ?",
+ keyspace,
+ table);
+
+ ArrayList<TokenRange> tokenRanges = new ArrayList<>();
+ for (Row row : resultSet) {
+ TokenRange tokenRange =
+ new TokenRange(
+ row.getLong("partitions_count"),
+ row.getLong("mean_partition_size"),
+ row.getString("range_start"),
+ row.getString("range_end"));
+ tokenRanges.add(tokenRange);
+ }
+ // The table may not contain the estimates yet
+ // or have partitions_count and mean_partition_size fields = 0
+ // if the data was just inserted and the amount of data in the table
was small.
+ // This is very common situation during tests,
+ // when we insert a few rows and immediately query them.
+ // However, for tiny data sets the lack of size estimates is not a
problem at all,
+ // because we don't want to split tiny data anyways.
+ // Therefore, we're not issuing a warning if the result set was empty
+ // or mean_partition_size and partitions_count = 0.
+ return tokenRanges;
+ }
+
+ /**
+ * Measure distance between two tokens.
+ *
+ * @param token1 The measure is symmetrical so token1 and token2 can be
exchanged
+ * @param token2 The measure is symmetrical so token1 and token2 can be
exchanged
+ * @return Number of tokens that separate token1 and token2
+ */
+ private BigInteger distance(BigInteger token1, BigInteger token2) {
+ // token2 > token1
+ if (token2.compareTo(token1) > 0) {
+ return token2.subtract(token1);
+ } else {
+ return token2.subtract(token1).add(partitioner.ringSize);
+ }
+ }
+
+ /** enum to configure the SplitGenerator based on Apache Cassandra
partitioners. */
+ public enum CassandraPartitioner {
+ MURMUR3PARTITIONER(
+ "Murmur3Partitioner",
+ BigInteger.valueOf(2).pow(63).negate(),
+ BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE)),
+ RANDOMPARTITIONER(
+ "RandomPartitioner",
+ BigInteger.ZERO,
+ BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE));
+
+ private final BigInteger minToken;
+ private final BigInteger maxToken;
+ private final BigInteger ringSize;
+ private final String className;
+
+ CassandraPartitioner(String className, BigInteger minToken, BigInteger
maxToken) {
+ this.className = className;
+ this.minToken = minToken;
+ this.maxToken = maxToken;
+ this.ringSize = maxToken.subtract(minToken).add(BigInteger.ONE);
+ }
+
+ public String getClassName() {
+ return className;
+ }
+ }
+
+ private static class TokenRange {
+ private final long partitionCount;
+ private final long meanPartitionSize;
+ private final BigInteger rangeStart;
+ private final BigInteger rangeEnd;
+
+ private TokenRange(
+ long partitionCount, long meanPartitionSize, String
rangeStart, String rangeEnd) {
+ this.partitionCount = partitionCount;
+ this.meanPartitionSize = meanPartitionSize;
+ this.rangeStart = new BigInteger(rangeStart);
+ this.rangeEnd = new BigInteger(rangeEnd);
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
new file mode 100644
index 0000000..74f25d0
--- /dev/null
+++
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+
+/** Utils for BigInteger reading and writing in serde context. */
+public class BigIntegerSerializationUtils {
+ public static void write(BigInteger bigInteger, DataOutput output) throws
IOException {
+ final byte[] bigIntegerBytes = bigInteger.toByteArray();
+ output.writeInt(bigIntegerBytes.length);
+ output.write(bigIntegerBytes);
+ }
+
+ public static BigInteger read(DataInput input) throws IOException {
+ final int bigIntegerSize = input.readInt();
+ final byte[] bigIntegerBytes = new byte[bigIntegerSize];
+ input.readFully(bigIntegerBytes);
+ return new BigInteger(bigIntegerBytes);
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index 764c001..441cc09 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
new file mode 100644
index 0000000..83ecaae
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+ @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new
MiniClusterTestEnvironment();
+
+ @TestExternalSystem
+ CassandraTestEnvironment cassandraTestEnvironment = new
CassandraTestEnvironment(true);
+
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
+ @TestContext
+ CassandraTestContextFactory contextFactory =
+ new CassandraTestContextFactory(cassandraTestEnvironment);
+
+ @TestTemplate
+ @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default
Cassandra partitioner)")
+ public void testGenerateSplitsMurMur3Partitioner(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic) {
+ final int parallelism = 2;
+ SplitsGenerator generator =
+ new SplitsGenerator(
+ MURMUR3PARTITIONER,
+ cassandraTestEnvironment.getSession(),
+ CassandraTestEnvironment.KEYSPACE,
+ CassandraTestEnvironment.SPLITS_TABLE,
+ parallelism,
+ CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT);
+ final CassandraEnumeratorState state = generator.prepareSplits();
+
+ // no maxSplitMemorySize specified falling back number of splits =
parallelism
+ assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+
+ final CassandraSplit split1 = state.getNextSplit();
+ checkNotNull(split1, "No splits left to generate in
CassandraEnumeratorState");
+ assertThat(split1.splitId()).isEqualTo("(-9223372036854775808,0)");
+
+ final CassandraSplit split2 = state.getNextSplit();
+ checkNotNull(split2, "No splits left to generate in
CassandraEnumeratorState");
+ assertThat(split2.splitId()).isEqualTo("(0,9223372036854775807)");
+ }
+
+ @TestTemplate
+ @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+ public void testGenerateSplitsRandomPartitioner(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic) {
+ final int parallelism = 2;
+ final SplitsGenerator generator =
+ new SplitsGenerator(
+ RANDOMPARTITIONER,
+ cassandraTestEnvironment.getSession(),
+ CassandraTestEnvironment.KEYSPACE,
+ CassandraTestEnvironment.SPLITS_TABLE,
+ parallelism,
+ CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT);
+ final CassandraEnumeratorState state = generator.prepareSplits();
+
+ // no maxSplitMemorySize specified falling back number of splits =
parallelism
+ assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+
+ final CassandraSplit split1 = state.getNextSplit();
+ checkNotNull(split1, "No splits left to generate in
CassandraEnumeratorState");
+
assertThat(split1.splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+
+ final CassandraSplit split2 = state.getNextSplit();
+ checkNotNull(split2, "No splits left to generate in
CassandraEnumeratorState");
+ assertThat(split2.splitId())
+ .isEqualTo(
+
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+ }
+
+ @TestTemplate
+ @DisplayName("Test splitting with a correct split size set")
+ public void testGenerateSplitsWithCorrectSize(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ final int parallelism = 2;
+ final long maxSplitMemorySize = 10000L;
+ final SplitsGenerator generator =
+ new SplitsGenerator(
+ MURMUR3PARTITIONER,
+ cassandraTestEnvironment.getSession(),
+ CassandraTestEnvironment.KEYSPACE,
+ CassandraTestEnvironment.SPLITS_TABLE,
+ parallelism,
+ maxSplitMemorySize);
+ final long tableSize = generator.estimateTableSize();
+ // sanity check to ensure that the size estimates were updated in the
Cassandra cluster
+ assertThat(tableSize).isEqualTo(35840L);
+ final CassandraEnumeratorState cassandraEnumeratorState =
generator.prepareSplits();
+ assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate())
+ // regular case
+ .isEqualTo(tableSize / maxSplitMemorySize);
+ }
+
+ @TestTemplate
+ @DisplayName("Test splitting with a too big split size set")
+ public void testGenerateSplitsWithTooHighMaximumSplitSize(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ final int parallelism = 20;
+ final SplitsGenerator generator =
+ new SplitsGenerator(
+ MURMUR3PARTITIONER,
+ cassandraTestEnvironment.getSession(),
+ CassandraTestEnvironment.KEYSPACE,
+ CassandraTestEnvironment.SPLITS_TABLE,
+ parallelism,
+ 100_000_000L);
+ // sanity check to ensure that the size estimates were updated in the
Cassandra cluster
+ assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+ final CassandraEnumeratorState cassandraEnumeratorState =
generator.prepareSplits();
+ // maxSplitMemorySize is too high compared to table size. Falling back
to parallelism splits
+ // too low maxSplitMemorySize is guarded by an assertion > min at
source creation
+
assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+ }
+
+ // overridden to use unordered checks
+ @Override
+ protected void checkResultWithSemantic(
+ CloseableIterator<Pojo> resultIterator,
+ List<List<Pojo>> testData,
+ CheckpointingMode semantic,
+ Integer limit) {
+ if (limit != null) {
+ Runnable runnable =
+ () ->
+
CollectIteratorAssertions.assertUnordered(resultIterator)
+ .withNumRecordsLimit(limit)
+ .matchesRecordsFromSource(testData,
semantic);
+
+
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+ } else {
+ CollectIteratorAssertions.assertUnordered(resultIterator)
+ .matchesRecordsFromSource(testData, semantic);
+ }
+ }
+
+ @Disabled("Not a unbounded source")
+ @Override
+ public void testSourceMetrics(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {}
+
+ @Disabled("Not a unbounded source")
+ @Override
+ public void testSavepoint(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic) {}
+
+ @Disabled("Not a unbounded source")
+ @Override
+ public void testScaleUp(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic) {}
+
+ @Disabled("Not a unbounded source")
+ @Override
+ public void testScaleDown(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ CheckpointingMode semantic) {}
+
+ @Disabled("Not a unbounded source")
+ @Override
+ public void testTaskManagerFailure(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<Pojo> externalContext,
+ ClusterControllable controller,
+ CheckpointingMode semantic) {}
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
new file mode 100644
index 0000000..fb69f2b
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+import
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Junit {@link DataStreamSourceExternalContext} that contains everything
related to Cassandra
+ * source test cases especially test table management.
+ */
+public class CassandraTestContext implements
DataStreamSourceExternalContext<Pojo> {
+
+ static final String TABLE_NAME = "batches";
+
+ private static final String CREATE_TABLE_QUERY =
+ "CREATE TABLE "
+ + CassandraTestEnvironment.KEYSPACE
+ + "."
+ + TABLE_NAME
+ + " (id text PRIMARY KEY, counter int, batch_id int)"
+ + ";";
+
+ private static final String DROP_TABLE_QUERY =
+ "DROP TABLE " + CassandraTestEnvironment.KEYSPACE + "." +
TABLE_NAME + ";";
+
+ private static final int RECORDS_PER_SPLIT = 20;
+
+ private final Mapper<Pojo> mapper;
+ private final MapperOptions mapperOptions;
+ private final ClusterBuilder clusterBuilder;
+ private final Session session;
+ private ExternalSystemSplitDataWriter<Pojo> splitDataWriter;
+
+ public CassandraTestContext(CassandraTestEnvironment
cassandraTestEnvironment) {
+ clusterBuilder = cassandraTestEnvironment.getClusterBuilder();
+ session = cassandraTestEnvironment.getSession();
+ createTable();
+ mapper = new
MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class);
+ mapperOptions = () -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)};
+ }
+
+ @Override
+ public TypeInformation<Pojo> getProducedType() {
+ return TypeInformation.of(Pojo.class);
+ }
+
+ @Override
+ public List<URL> getConnectorJarPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Source<Pojo, ?, ?> createSource(TestingSourceSettings
sourceSettings)
+ throws UnsupportedOperationException {
+
+ return new CassandraSource<>(
+ clusterBuilder,
+ Pojo.class,
+ String.format(
+ "SELECT * FROM %s.%s;",
CassandraTestEnvironment.KEYSPACE, TABLE_NAME),
+ mapperOptions);
+ }
+
+ @Override
+ public ExternalSystemSplitDataWriter<Pojo> createSourceSplitDataWriter(
+ TestingSourceSettings sourceSettings) {
+ splitDataWriter =
+ new ExternalSystemSplitDataWriter<Pojo>() {
+
+ @Override
+ public void writeRecords(List<Pojo> records) {
+ for (Pojo pojo : records) {
+ mapper.save(pojo,
mapperOptions.getMapperOptions());
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do, cluster/session is shared at the
CassandraTestEnvironment
+ // level
+ }
+ };
+ return splitDataWriter;
+ }
+
+ @Override
+ public List<Pojo> generateTestData(
+ TestingSourceSettings sourceSettings, int splitIndex, long seed) {
+ List<Pojo> testData = new ArrayList<>(RECORDS_PER_SPLIT);
+ // generate RECORDS_PER_SPLIT pojos per split and use splitId as pojo
batchIndex so that
+ // pojos are considered equal when they belong to the same split
+ // as requested in implementation notes.
+ for (int i = 0; i < RECORDS_PER_SPLIT; i++) {
+ Pojo pojo = new Pojo(String.valueOf(seed + i), i, splitIndex);
+ testData.add(pojo);
+ }
+ return testData;
+ }
+
+ @Override
+ public void close() throws Exception {
+ dropTable();
+ // NB: cluster/session is shared at the CassandraTestEnvironment level
+ }
+
+ private void createTable() {
+
session.execute(CassandraTestEnvironment.requestWithTimeout(CREATE_TABLE_QUERY));
+ }
+
+ private void dropTable() {
+
session.execute(CassandraTestEnvironment.requestWithTimeout(DROP_TABLE_QUERY));
+ }
+
+ static class CassandraTestContextFactory
+ implements ExternalContextFactory<CassandraTestContext> {
+
+ private final CassandraTestEnvironment cassandraTestEnvironment;
+
+ public CassandraTestContextFactory(CassandraTestEnvironment
cassandraTestEnvironment) {
+ this.cassandraTestEnvironment = cassandraTestEnvironment;
+ }
+
+ @Override
+ public CassandraTestContext createExternalContext(String testName) {
+ return new CassandraTestContext(cassandraTestEnvironment);
+ }
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
new file mode 100644
index 0000000..24b9e60
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Junit test environment that contains everything needed at the test suite
level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder
setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraTestEnvironment.class);
+ private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+ private static final int CQL_PORT = 9042;
+
+ private static final int READ_TIMEOUT_MILLIS = 36000;
+
+ // flushing mem table to SS tables is an asynchronous operation that may
take a while
+ private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
+
+ static final String KEYSPACE = "flink";
+
+ private static final String CREATE_KEYSPACE_QUERY =
+ "CREATE KEYSPACE "
+ + KEYSPACE
+ + " WITH replication= {'class':'SimpleStrategy',
'replication_factor':1};";
+
+ static final String SPLITS_TABLE = "flinksplits";
+ private static final String CREATE_SPLITS_TABLE_QUERY =
+ "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int
PRIMARY KEY, counter int);";
+ private static final String INSERT_INTO_FLINK_SPLITS =
+ "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)"
+ " VALUES (%d, %d)";
+ private static final int NB_SPLITS_RECORDS = 1000;
+
+ @Container private final CassandraContainer cassandraContainer;
+
+ boolean insertTestDataForSplitSizeTests;
+ private Cluster cluster;
+ private Session session;
+ private ClusterBuilder clusterBuilder;
+
+ public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
+ this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
+ cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+ // more generous timeouts
+ addJavaOpts(
+ cassandraContainer,
+ "-Dcassandra.request_timeout_in_ms=30000",
+ "-Dcassandra.read_request_timeout_in_ms=15000",
+ "-Dcassandra.write_request_timeout_in_ms=6000");
+ }
+
+ @Override
+ public void startUp() throws Exception {
+ startEnv();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ stopEnv();
+ }
+
+ private static void addJavaOpts(GenericContainer<?> container, String...
opts) {
+ String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+ container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, "
"));
+ }
+
+ private void startEnv() throws Exception {
+ // configure container start to wait until cassandra is ready to
receive queries
+ cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+ // start with retrials
+ cassandraContainer.start();
+ cassandraContainer.followOutput(
+ new Slf4jLogConsumer(LOG),
+ OutputFrame.OutputType.END,
+ OutputFrame.OutputType.STDERR,
+ OutputFrame.OutputType.STDOUT);
+
+ cluster = cassandraContainer.getCluster();
+ clusterBuilder =
+ createBuilderWithConsistencyLevel(
+ ConsistencyLevel.ONE,
+ cassandraContainer.getHost(),
+ cassandraContainer.getMappedPort(CQL_PORT));
+
+ session = cluster.connect();
+ session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+ // create a dedicated table for split size tests (to avoid having to
flush with each test)
+ if (insertTestDataForSplitSizeTests) {
+ insertTestDataForSplitSizeTests();
+ }
+ }
+
+ private void insertTestDataForSplitSizeTests() throws Exception {
+ session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+ for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
+
session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i,
i)));
+ }
+ flushMemTables(SPLITS_TABLE);
+ }
+
+ private void stopEnv() {
+
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ cassandraContainer.stop();
+ }
+
+ private ClusterBuilder createBuilderWithConsistencyLevel(
+ ConsistencyLevel consistencyLevel, String host, int port) {
+ return new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPointsWithPorts(new
InetSocketAddress(host, port))
+ .withQueryOptions(
+ new QueryOptions()
+ .setConsistencyLevel(consistencyLevel)
+
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+ .withSocketOptions(
+ new SocketOptions()
+ // default timeout x 3
+ .setConnectTimeoutMillis(15000)
+ // default timeout x3 and higher than
+ // request_timeout_in_ms at the
cluster level
+
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+ .withoutJMXReporting()
+ .withoutMetrics()
+ .build();
+ }
+ };
+ }
+
+ /**
+ * Force the flush of cassandra memTables to SSTables in order to update
size_estimates. It is
+ * needed for the tests because we just inserted records, we need to force
cassandra to update
+ * size_estimates system table.
+ */
+ void flushMemTables(String table) throws Exception {
+ cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE,
table);
+ Thread.sleep(FLUSH_MEMTABLES_DELAY);
+ }
+
+ static Statement requestWithTimeout(String query) {
+ return new
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
+ }
+
+ public ClusterBuilder getClusterBuilder() {
+ return clusterBuilder;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
new file mode 100644
index 0000000..0a4a3ec
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraEnumeratorStateSerializer}. */
+class CassandraEnumeratorStateSerializerTest {
+
+ @Test
+ public void testSerdeRoundtrip() throws Exception {
+ final Queue<CassandraSplit> splitsToReassign =
+ new ArrayDeque<>(
+ ImmutableList.of(
+ new CassandraSplit(BigInteger.ZERO,
BigInteger.TEN),
+ new CassandraSplit(BigInteger.TEN,
BigInteger.ZERO)));
+
+ final CassandraEnumeratorState cassandraEnumeratorState =
+ new CassandraEnumeratorState(
+ 10, BigInteger.ONE, BigInteger.ZERO, BigInteger.TEN,
splitsToReassign);
+
+ final byte[] serialized =
+
CassandraEnumeratorStateSerializer.INSTANCE.serialize(cassandraEnumeratorState);
+ final CassandraEnumeratorState deserialized =
+ CassandraEnumeratorStateSerializer.INSTANCE.deserialize(
+ CassandraEnumeratorStateSerializer.CURRENT_VERSION,
serialized);
+ assertThat(deserialized)
+ .isEqualTo(cassandraEnumeratorState)
+ .withFailMessage(
+ "CassandraEnumeratorState is not the same as input
object after serde roundtrip");
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
new file mode 100644
index 0000000..649eeca
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+ @Test
+ public void testKeySpaceTableExtractionRegexp() {
+ Arrays.asList(
+ "select field FROM keyspace.table where field =
value;",
+ "select * FROM keyspace.table;",
+ "select field1, field2 from keyspace.table;",
+ "select field1, field2 from keyspace.table
LIMIT(1000);",
+ "select field1 from keyspace.table ;",
+ "select field1 from keyspace.table where field1=1;")
+ .forEach(CassandraQueryTest::assertQueryFormatCorrect);
+
+ Arrays.asList(
+ "select field1 from table;", // missing keyspace
+ "select field1 from .table", // undefined keyspace var
in a script
+ "select field1 from keyspace.;", // undefined table
var in a script
+ "select field1 from keyspace.table" // missing ";"
+ )
+ .forEach(CassandraQueryTest::assertQueryFormatIncorrect);
+ }
+
+ @Test
+ public void testProhibitedClauses() {
+ Arrays.asList(
+ "SELECT COUNT(*) from flink.table;",
+ "SELECT AVG(*) from flink.table;",
+ "SELECT MIN(*) from flink.table;",
+ "SELECT MAX(*) from flink.table;",
+ "SELECT SUM(*) from flink.table;",
+ "SELECT field1, field2 from flink.table ORDER BY
field1;",
+ "SELECT field1, field2 from flink.table GROUP BY
field1;")
+ .forEach(CassandraQueryTest::assertProhibitedClauseRejected);
+ }
+
+ @Test
+ public void testGenerateRangeQuery() {
+ String query;
+ String outputQuery;
+
+ // query with where clause
+ query = "SELECT field FROM keyspace.table WHERE field = value;";
+ outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+ assertThat(outputQuery)
+ .isEqualTo(
+ "SELECT field FROM keyspace.table WHERE (token(field)
>= ?) AND (token(field) < ?) AND field = value;");
+
+ // query without where clause
+ query = "SELECT * FROM keyspace.table;";
+ outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+ assertThat(outputQuery)
+ .isEqualTo(
+ "SELECT * FROM keyspace.table WHERE (token(field) >=
?) AND (token(field) < ?);");
+
+ // query without where clause but with another trailing clause
+ query = "SELECT field FROM keyspace.table LIMIT(1000);";
+ outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+ assertThat(outputQuery)
+ .isEqualTo(
+ "SELECT field FROM keyspace.table WHERE (token(field)
>= ?) AND (token(field) < ?) LIMIT(1000);");
+
+ // query with where clause and another trailing clause
+ query = "SELECT field FROM keyspace.table WHERE field = value
LIMIT(1000);";
+ outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+ assertThat(outputQuery)
+ .isEqualTo(
+ "SELECT field FROM keyspace.table WHERE (token(field)
>= ?) AND (token(field) < ?) AND field = value LIMIT(1000);");
+ }
+
+ private static void assertQueryFormatIncorrect(String query) {
+ assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query))
+ .hasMessageContaining(
+ "Query must be of the form select ... from
keyspace.table ...;");
+ }
+
+ private static void assertQueryFormatCorrect(String query) {
+ Matcher matcher = CassandraSource.SELECT_REGEXP.matcher(query);
+ assertThat(matcher.matches()).isTrue();
+ assertThat(matcher.group(1)).isEqualTo("keyspace");
+ assertThat(matcher.group(2)).isEqualTo("table");
+ }
+
+ private static void assertProhibitedClauseRejected(String query) {
+ assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query))
+ .hasMessageContaining(
+ "Aggregations/OrderBy are not supported because the
query is executed on subsets/partitions of the input table");
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
new file mode 100644
index 0000000..b79fba6
--- /dev/null
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraSplitSerializer}. */
+class CassandraSplitSerializerTest {
+
+ @Test
+ public void testSerdeRoundtrip() throws IOException {
+ final CassandraSplit testData = new CassandraSplit(BigInteger.ONE,
BigInteger.TEN);
+ final byte[] serialized =
CassandraSplitSerializer.INSTANCE.serialize(testData);
+ final CassandraSplit deserialized =
+ CassandraSplitSerializer.INSTANCE.deserialize(
+ CassandraSplitSerializer.CURRENT_VERSION, serialized);
+ assertThat(deserialized)
+ .isEqualTo(testData)
+ .withFailMessage(
+ "CassandraSplit is not the same as input object after
serde roundtrip");
+ }
+}
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
similarity index 73%
rename from
flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
rename to
flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
index 559f107..a32e492 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.flink.batch.connectors.cassandra.example;
+package org.apache.flink.connectors.cassandra.utils;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
+import java.util.Objects;
/** Test Pojo with DataStax annotations used. */
@Table(keyspace = "flink", name = "batches")
@@ -69,4 +70,27 @@ public class Pojo implements Serializable {
public void setBatchID(int batchId) {
this.batchID = batchId;
}
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{\"id\":\"%s\", \"counter\":%d, \"batchID\":%d}", id,
counter, batchID);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Pojo pojo = (Pojo) o;
+ return counter == pojo.counter && batchID == pojo.batchID &&
id.equals(pojo.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, counter, batchID);
+ }
}
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 8cf47a1..770d68b 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,7 +25,7 @@ under the License.
<suppressions>
<!-- Cassandra connectors have to use guava directly -->
<suppress
-
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java"
+
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java|CassandraRecordEmitter"
checks="IllegalImport"/>
</suppressions>