zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1091859558


##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/SplitsGeneratorTest.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SplitsGenerator}. */
+public final class SplitsGeneratorTest {

Review Comment:
   ```suggestion
   final class SplitsGeneratorTest {
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+    @Test
+    public void testKeySpaceTableExtractionRegexp() {
+        final Pattern pattern = 
Pattern.compile(CassandraSplitReader.SELECT_REGEXP);
+        Matcher matcher;
+        matcher = pattern.matcher("SELECT field FROM keyspace.table where 
field = value;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("SELECT * FROM keyspace.table;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1, field2 from 
keyspace.table;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1, field2 from keyspace.table 
LIMIT(1000);");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from keyspace.table ;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from keyspace.table where 
field1=1;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from table;"); // missing 
keyspace
+        assertThat(matcher.matches()).isFalse();
+
+        matcher = pattern.matcher("select field1 from keyspace.table"); // 
missing ";"
+        assertThat(matcher.matches()).isFalse();
+    }
+
+    @Test
+    public void testProhibitedClauses() {
+        assertThatThrownBy(
+                        () ->
+                                CassandraSource.checkQueryValidity(
+                                        "SELECT COUNT(*) from flink.table;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+        assertThatThrownBy(
+                        () -> CassandraSource.checkQueryValidity("SELECT 
AVG(*) from flink.table;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+
+        assertThatThrownBy(
+                        () -> CassandraSource.checkQueryValidity("SELECT 
MIN(*) from flink.table;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+        assertThatThrownBy(
+                        () -> CassandraSource.checkQueryValidity("SELECT 
MAX(*) from flink.table;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+        assertThatThrownBy(
+                        () -> CassandraSource.checkQueryValidity("SELECT 
SUM(*) from flink.table;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+        assertThatThrownBy(
+                        () ->
+                                CassandraSource.checkQueryValidity(
+                                        "SELECT field1, field2 from 
flink.table ORDER BY field1;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");
+        assertThatThrownBy(
+                        () ->
+                                CassandraSource.checkQueryValidity(
+                                        "SELECT field1, field2 from 
flink.table GROUP BY field1;"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("query must not contain aggregate or 
order clauses ");

Review Comment:
   You could make this a bit more readable by having adding another method that 
does the assertions, and in here you do something like:
   ```
   Arrays.stream(
       "SELECT COUNT(*) from flink.table;", 
       ...)
     .forEach(CassandraQueryTest::assertProhibitedClauseRejected).
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -41,139 +41,31 @@ public SplitsGenerator(CassandraPartitioner partitioner) {
     }
 
     /**
-     * Given properly ordered list of Cassandra tokens, compute at least 
{@code totalSplitCount}
-     * splits. Each split can contain several token ranges in order to reduce 
the overhead of
-     * Cassandra vnodes. Currently, token range grouping is not smart and 
doesn't check if they
-     * share the same replicas.
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the ring.
      *
-     * @param totalSplitCount requested total amount of splits. This function 
may generate more
-     *     splits.
-     * @param ringTokens list of all start tokens in Cassandra cluster. They 
have to be in ring
-     *     order.
-     * @return list containing at least {@code totalSplitCount} 
CassandraSplits.
+     * @param numSplits requested number of splits
+     * @return list containing {@code numSplits} CassandraSplits.
      */
-    public List<CassandraSplit> generateSplits(long totalSplitCount, 
List<BigInteger> ringTokens) {
-        if (totalSplitCount == 1) {
-            RingRange totalRingRange = RingRange.of(partitioner.min(), 
partitioner.max());
+    public List<CassandraSplit> generateSplits(long numSplits) {

Review Comment:
   AFAICT this could be a static method; there isn't really a benefit in having 
a `SplitsGenerator` object. It's only created, used exactly once, and then 
discarded right away, all happening in the same method.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to 
assign. */
+public class CassandraEnumeratorState {
+    private final Queue<CassandraSplit> unassignedSplits;
+
+    public CassandraEnumeratorState() {
+        this.unassignedSplits = new ArrayDeque<>();
+    }
+
+    public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) {
+        this.unassignedSplits = unassignedSplits;
+    }
+
+    public void addNewSplits(Collection<CassandraSplit> newSplits) {
+        unassignedSplits.addAll(newSplits);
+    }
+
+    public CassandraSplit getASplit() {

Review Comment:
   add `@Nullable`



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to 
assign. */
+public class CassandraEnumeratorState {
+    private final Queue<CassandraSplit> unassignedSplits;
+
+    public CassandraEnumeratorState() {
+        this.unassignedSplits = new ArrayDeque<>();
+    }
+
+    public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) {
+        this.unassignedSplits = unassignedSplits;

Review Comment:
   add `checkNotNull`



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitState.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import com.datastax.driver.core.ResultSet;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+
+/**
+ * Mutable {@link CassandraSplit} that keeps track of the reading process of 
the associated split.
+ */
+public class CassandraSplitState {
+    private final CassandraSplit cassandraSplit;
+    // Cassandra ResultSet is paginated, a new page is read only if all the 
records of the previous
+    // one were consumed. fetch() can be interrupted so we use the resultSet 
to keep track of the
+    // reading process.
+    // It is null when reading has not started (before fetch is called on the 
split).
+    @Nullable private ResultSet resultSet;

Review Comment:
   reminder: Curious how this is meant to persist interrupted fetches with 
checkpoints in-between fetches. Risk of duplicating input.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader<CassandraRow, 
CassandraSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+    public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+    private final Cluster cluster;
+    private final Session session;
+    private final Set<CassandraSplitState> unprocessedSplits;
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
+    private final String query;
+
+    public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+        // need a thread safe set
+        this.unprocessedSplits = ConcurrentHashMap.newKeySet();
+        this.query = query;
+        cluster = clusterBuilder.getCluster();
+        session = cluster.connect();
+    }
+
+    @Override
+    public RecordsWithSplitIds<CassandraRow> fetch() {
+        Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>();
+        Set<String> finishedSplits = new HashSet<>();
+        Metadata clusterMetadata = cluster.getMetadata();
+
+        String partitionKey = getPartitionKey(clusterMetadata);
+        String finalQuery = generateRangeQuery(query, partitionKey);
+        PreparedStatement preparedStatement = session.prepare(finalQuery);
+        // Set wakeup to false to start consuming.
+        wakeup.compareAndSet(true, false);
+        for (CassandraSplitState cassandraSplitState : unprocessedSplits) {

Review Comment:
   > I added MAX_RECORDS_PER_SPLIT conf parameter as proposed above because I 
did not know if such parameter existed already. Feel
   
   I'll look around. The naming seems off though; it's not a maximum records 
for a split (which would be more relevant for the `SplitGenerator`), but more 
of a `batch size`.
   
   > So, I could store in the CassandraSplitState a reference to the ResultSet 
to resume the output of remaining records on a later fetch().
   
   You also have to consider how this information can be stored in a 
checkpoint. A checkpoint may happen between 2 fetches for 1 split (== first 
half of emitted will be remembered by Flink), and then the job is restarted. 
As-is you'd read the first half again, because your split state isn't written 
to checkpoints.
   
   Not sure how to solve that. If cassandra has a deterministic order in which 
it returns records (it probably doesnt :see_no_evil: ) then you could remember 
the count and skip so many records or adjust the start token of the split.
   
   Alternatively we could consider handling this in the SplitGenerator, 
generating as many splits as necessary to keep at most N records in memory in 
each split reader. (We'd never have the case that 1 splits contains the entire 
table)



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraSplitSerializer}. */
+public class CassandraSplitSerializerTest {

Review Comment:
   ```suggestion
   class CassandraSplitSerializerTest {
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraEnumeratorStateSerializer}. */
+public class CassandraEnumeratorStateSerializerTest {

Review Comment:
   ```suggestion
   class CassandraEnumeratorStateSerializerTest {
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+    @Test
+    public void testKeySpaceTableExtractionRegexp() {
+        final Pattern pattern = 
Pattern.compile(CassandraSplitReader.SELECT_REGEXP);
+        Matcher matcher;
+        matcher = pattern.matcher("SELECT field FROM keyspace.table where 
field = value;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("SELECT * FROM keyspace.table;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1, field2 from 
keyspace.table;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1, field2 from keyspace.table 
LIMIT(1000);");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from keyspace.table ;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from keyspace.table where 
field1=1;");
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+
+        matcher = pattern.matcher("select field1 from table;"); // missing 
keyspace
+        assertThat(matcher.matches()).isFalse();
+
+        matcher = pattern.matcher("select field1 from keyspace.table"); // 
missing ";"
+        assertThat(matcher.matches()).isFalse();

Review Comment:
   Similar to the below test, there are basically only 2 cases here, so we 
could categorize inputs into accepted/rejected and call new assertion methods.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to 
assign. */
+public class CassandraEnumeratorState {
+    private final Queue<CassandraSplit> unassignedSplits;
+
+    public CassandraEnumeratorState() {
+        this.unassignedSplits = new ArrayDeque<>();
+    }
+
+    public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) {

Review Comment:
   ```suggestion
       private CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) 
{
   ```
   Would this be possible?



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to 
assign. */

Review Comment:
   ```suggestion
   /** State for {@link CassandraSplitEnumerator} to track the splits yet to 
assign. */
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and Flink
+ * source parallelism.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+
+    private final CassandraPartitioner partitioner;
+
+    public SplitsGenerator(CassandraPartitioner partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    /**
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the ring.
+     *
+     * @param numSplits requested number of splits
+     * @return list containing {@code numSplits} CassandraSplits.
+     */
+    public List<CassandraSplit> generateSplits(long numSplits) {

Review Comment:
   ```suggestion
       public List<CassandraSplit> generateSplits(int numSplits) {
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and Flink
+ * source parallelism.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+
+    private final CassandraPartitioner partitioner;
+
+    public SplitsGenerator(CassandraPartitioner partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    /**
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the ring.
+     *
+     * @param numSplits requested number of splits
+     * @return list containing {@code numSplits} CassandraSplits.
+     */
+    public List<CassandraSplit> generateSplits(long numSplits) {
+        if (numSplits == 1) {
+            return Collections.singletonList(
+                    new CassandraSplit(partitioner.minToken(), 
partitioner.maxToken()));
+        }
+        List<CassandraSplit> splits = new ArrayList<>();
+        BigInteger splitSize =
+                (partitioner.ringSize()).divide(new 
BigInteger(String.valueOf(numSplits)));
+
+        BigInteger startToken, endToken = partitioner.minToken();
+        for (int splitCount = 1; splitCount <= numSplits; splitCount++) {
+            startToken = endToken;
+            endToken = startToken.add(splitSize);
+            if (splitCount == numSplits) {
+                endToken = partitioner.maxToken();
+            }
+            splits.add(new CassandraSplit(startToken, endToken));
+        }

Review Comment:
   ```suggestion
           BigInteger startToken = partitioner.minToken();
           for (int splitCount = 1; splitCount <= numSplits; splitCount++) {
               BigInteger endToken = startToken.add(splitSize);
               if (splitCount == numSplits) {
                   endToken = partitioner.maxToken();
               }
               splits.add(new CassandraSplit(startToken, endToken));
   
               startToken = endToken;
           }
   ```
   
   To me this feels more intuitive/readable. It's strange to assign minToken to 
endToken at the start, as is setting up the next iteration at the start of the 
current iteration.
   
   Personally I'd also move the `splitCount == numSplits` out of the loop. It 
is trivially true when a "simple" loop from 0 to `< numSplits` terminates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to