zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1091859558
########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/SplitsGeneratorTest.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SplitsGenerator}. */ +public final class SplitsGeneratorTest { Review Comment: ```suggestion final class SplitsGeneratorTest { ``` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.CassandraSource; + +import org.junit.jupiter.api.Test; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** tests for query generation and query sanity checks. */ +class CassandraQueryTest { + + @Test + public void testKeySpaceTableExtractionRegexp() { + final Pattern pattern = Pattern.compile(CassandraSplitReader.SELECT_REGEXP); + Matcher matcher; + matcher = pattern.matcher("SELECT field FROM keyspace.table where field = value;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("SELECT * FROM keyspace.table;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1, field2 from keyspace.table;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1, field2 from keyspace.table LIMIT(1000);"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from keyspace.table ;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from keyspace.table where field1=1;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from table;"); // missing keyspace + assertThat(matcher.matches()).isFalse(); + + matcher = pattern.matcher("select field1 from keyspace.table"); // missing ";" + assertThat(matcher.matches()).isFalse(); + } + + @Test + public void testProhibitedClauses() { + assertThatThrownBy( + () -> + CassandraSource.checkQueryValidity( + "SELECT COUNT(*) from flink.table;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + assertThatThrownBy( + () -> CassandraSource.checkQueryValidity("SELECT AVG(*) from flink.table;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + + assertThatThrownBy( + () -> CassandraSource.checkQueryValidity("SELECT MIN(*) from flink.table;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + assertThatThrownBy( + () -> CassandraSource.checkQueryValidity("SELECT MAX(*) from flink.table;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + assertThatThrownBy( + () -> CassandraSource.checkQueryValidity("SELECT SUM(*) from flink.table;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + assertThatThrownBy( + () -> + CassandraSource.checkQueryValidity( + "SELECT field1, field2 from flink.table ORDER BY field1;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); + assertThatThrownBy( + () -> + CassandraSource.checkQueryValidity( + "SELECT field1, field2 from flink.table GROUP BY field1;")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("query must not contain aggregate or order clauses "); Review Comment: You could make this a bit more readable by having adding another method that does the assertions, and in here you do something like: ``` Arrays.stream( "SELECT COUNT(*) from flink.table;", ...) .forEach(CassandraQueryTest::assertProhibitedClauseRejected). ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -41,139 +41,31 @@ public SplitsGenerator(CassandraPartitioner partitioner) { } /** - * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} - * splits. Each split can contain several token ranges in order to reduce the overhead of - * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they - * share the same replicas. + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the ring. * - * @param totalSplitCount requested total amount of splits. This function may generate more - * splits. - * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring - * order. - * @return list containing at least {@code totalSplitCount} CassandraSplits. + * @param numSplits requested number of splits + * @return list containing {@code numSplits} CassandraSplits. */ - public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { - if (totalSplitCount == 1) { - RingRange totalRingRange = RingRange.of(partitioner.min(), partitioner.max()); + public List<CassandraSplit> generateSplits(long numSplits) { Review Comment: AFAICT this could be a static method; there isn't really a benefit in having a `SplitsGenerator` object. It's only created, used exactly once, and then discarded right away, all happening in the same method. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Objects; +import java.util.Queue; + +/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to assign. */ +public class CassandraEnumeratorState { + private final Queue<CassandraSplit> unassignedSplits; + + public CassandraEnumeratorState() { + this.unassignedSplits = new ArrayDeque<>(); + } + + public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) { + this.unassignedSplits = unassignedSplits; + } + + public void addNewSplits(Collection<CassandraSplit> newSplits) { + unassignedSplits.addAll(newSplits); + } + + public CassandraSplit getASplit() { Review Comment: add `@Nullable` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Objects; +import java.util.Queue; + +/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to assign. */ +public class CassandraEnumeratorState { + private final Queue<CassandraSplit> unassignedSplits; + + public CassandraEnumeratorState() { + this.unassignedSplits = new ArrayDeque<>(); + } + + public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) { + this.unassignedSplits = unassignedSplits; Review Comment: add `checkNotNull` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitState.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import com.datastax.driver.core.ResultSet; + +import javax.annotation.Nullable; + +import java.math.BigInteger; + +/** + * Mutable {@link CassandraSplit} that keeps track of the reading process of the associated split. + */ +public class CassandraSplitState { + private final CassandraSplit cassandraSplit; + // Cassandra ResultSet is paginated, a new page is read only if all the records of the previous + // one were consumed. fetch() can be interrupted so we use the resultSet to keep track of the + // reading process. + // It is null when reading has not started (before fetch is called on the split). + @Nullable private ResultSet resultSet; Review Comment: reminder: Curious how this is meant to persist interrupted fetches with checkpoints in-between fetches. Risk of duplicating input. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { Review Comment: > I added MAX_RECORDS_PER_SPLIT conf parameter as proposed above because I did not know if such parameter existed already. Feel I'll look around. The naming seems off though; it's not a maximum records for a split (which would be more relevant for the `SplitGenerator`), but more of a `batch size`. > So, I could store in the CassandraSplitState a reference to the ResultSet to resume the output of remaining records on a later fetch(). You also have to consider how this information can be stored in a checkpoint. A checkpoint may happen between 2 fetches for 1 split (== first half of emitted will be remembered by Flink), and then the job is restarted. As-is you'd read the first half again, because your split state isn't written to checkpoints. Not sure how to solve that. If cassandra has a deterministic order in which it returns records (it probably doesnt :see_no_evil: ) then you could remember the count and skip so many records or adjust the start token of the split. Alternatively we could consider handling this in the SplitGenerator, generating as many splits as necessary to keep at most N records in memory in each split reader. (We'd never have the case that 1 splits contains the entire table) ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CassandraSplitSerializer}. */ +public class CassandraSplitSerializerTest { Review Comment: ```suggestion class CassandraSplitSerializerTest { ``` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CassandraEnumeratorStateSerializer}. */ +public class CassandraEnumeratorStateSerializerTest { Review Comment: ```suggestion class CassandraEnumeratorStateSerializerTest { ``` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.CassandraSource; + +import org.junit.jupiter.api.Test; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** tests for query generation and query sanity checks. */ +class CassandraQueryTest { + + @Test + public void testKeySpaceTableExtractionRegexp() { + final Pattern pattern = Pattern.compile(CassandraSplitReader.SELECT_REGEXP); + Matcher matcher; + matcher = pattern.matcher("SELECT field FROM keyspace.table where field = value;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("SELECT * FROM keyspace.table;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1, field2 from keyspace.table;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1, field2 from keyspace.table LIMIT(1000);"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from keyspace.table ;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from keyspace.table where field1=1;"); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + + matcher = pattern.matcher("select field1 from table;"); // missing keyspace + assertThat(matcher.matches()).isFalse(); + + matcher = pattern.matcher("select field1 from keyspace.table"); // missing ";" + assertThat(matcher.matches()).isFalse(); Review Comment: Similar to the below test, there are basically only 2 cases here, so we could categorize inputs into accepted/rejected and call new assertion methods. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Objects; +import java.util.Queue; + +/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to assign. */ +public class CassandraEnumeratorState { + private final Queue<CassandraSplit> unassignedSplits; + + public CassandraEnumeratorState() { + this.unassignedSplits = new ArrayDeque<>(); + } + + public CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) { Review Comment: ```suggestion private CassandraEnumeratorState(Queue<CassandraSplit> unassignedSplits) { ``` Would this be possible? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Objects; +import java.util.Queue; + +/** Sate for {@link CassandraSplitEnumerator} to track the splits yet to assign. */ Review Comment: ```suggestion /** State for {@link CassandraSplitEnumerator} to track the splits yet to assign. */ ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and Flink + * source parallelism. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final CassandraPartitioner partitioner; + + public SplitsGenerator(CassandraPartitioner partitioner) { + this.partitioner = partitioner; + } + + /** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the ring. + * + * @param numSplits requested number of splits + * @return list containing {@code numSplits} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long numSplits) { Review Comment: ```suggestion public List<CassandraSplit> generateSplits(int numSplits) { ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and Flink + * source parallelism. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final CassandraPartitioner partitioner; + + public SplitsGenerator(CassandraPartitioner partitioner) { + this.partitioner = partitioner; + } + + /** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the ring. + * + * @param numSplits requested number of splits + * @return list containing {@code numSplits} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long numSplits) { + if (numSplits == 1) { + return Collections.singletonList( + new CassandraSplit(partitioner.minToken(), partitioner.maxToken())); + } + List<CassandraSplit> splits = new ArrayList<>(); + BigInteger splitSize = + (partitioner.ringSize()).divide(new BigInteger(String.valueOf(numSplits))); + + BigInteger startToken, endToken = partitioner.minToken(); + for (int splitCount = 1; splitCount <= numSplits; splitCount++) { + startToken = endToken; + endToken = startToken.add(splitSize); + if (splitCount == numSplits) { + endToken = partitioner.maxToken(); + } + splits.add(new CassandraSplit(startToken, endToken)); + } Review Comment: ```suggestion BigInteger startToken = partitioner.minToken(); for (int splitCount = 1; splitCount <= numSplits; splitCount++) { BigInteger endToken = startToken.add(splitSize); if (splitCount == numSplits) { endToken = partitioner.maxToken(); } splits.add(new CassandraSplit(startToken, endToken)); startToken = endToken; } ``` To me this feels more intuitive/readable. It's strange to assign minToken to endToken at the start, as is setting up the next iteration at the start of the current iteration. Personally I'd also move the `splitCount == numSplits` out of the loop. It is trivially true when a "simple" loop from 0 to `< numSplits` terminates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
