zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1072189999
########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Immutable {@link SourceSplit} for Cassandra source. A Cassandra split is just a set of {@link Review Comment: ```suggestion * Immutable {@link SourceSplit} for Cassandra source. A Cassandra split is a set of {@link ``` Documentation guidelines and all that. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { Review Comment: Reminder to check why this is serializable. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } Review Comment: strange that this logic is different to RingRange#span. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} + * splits. Each split can contain several token ranges in order to reduce the overhead of + * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they + * share the same replicas. + * + * @param totalSplitCount requested total amount of splits. This function may generate more + * splits. + * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring + * order. + * @return list containing at least {@code totalSplitCount} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { + if (totalSplitCount == 1) { + RingRange totalRingRange = RingRange.of(rangeMin, rangeMax); + return Collections.singletonList( + new CassandraSplit(Collections.singleton(totalRingRange))); + } + int tokenRangeCount = ringTokens.size(); + + List<RingRange> ringRanges = new ArrayList<>(); + for (int i = 0; i < tokenRangeCount; i++) { + BigInteger start = ringTokens.get(i); + BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); + + if (isNotInRange(start) || isNotInRange(stop)) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s) not in range of %s", start, stop, partitioner)); + } + if (start.equals(stop) && tokenRangeCount != 1) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s): two nodes have the same token", start, stop)); + } + + BigInteger rangeSize = stop.subtract(start); + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) { + // wrap around case + rangeSize = rangeSize.add(this.rangeSize); + } + + // the below, in essence, does this: + // splitCount = Maths.ceil((rangeSize / cluster range size) * totalSplitCount) Review Comment: This is the first time that "cluster range size" is mentioned and it's not explained anywhere or what it's relation to `rangeSize` is. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { + + private final BigInteger start; + private final BigInteger end; + + private RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public static RingRange of(BigInteger start, BigInteger end) { + return new RingRange(start, end); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + /** + * Returns the size of this range. + * + * @return size of the range, max - range, in case of wrap + */ + BigInteger span(BigInteger ringSize) { + return (start.compareTo(end) >= 0) + ? end.subtract(start).add(ringSize) + : end.subtract(start); + } + + /** @return true if the ringRange overlaps. Note that if start == end, then wrapping is true */ Review Comment: Not sure what is meant with "overlaps". Why do we return true if start == end? Does start=end mean only a single element is part of the range? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { Review Comment: Am I reading this correctly that you're reading _all_ records from _all_ splits into memory before passing them on? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator; +import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReader; +import org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bounded source to read from Cassandra and return a collection of entities as {@code + * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code + * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing annotations (as described + * in <a + * href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/"> + * Cassandra object mapper</a>). + * + * <p>To use it, do the following: + * + * <pre>{@code + * ClusterBuilder clusterBuilder = new ClusterBuilder() { + * @Override + * protected Cluster buildCluster(Cluster.Builder builder) { + * return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT)) + * .withQueryOptions(new QueryOptions().setConsistencyLevel(CL)) + * .withSocketOptions(new SocketOptions() + * .setConnectTimeoutMillis(CONNECT_TIMEOUT) + * .setReadTimeoutMillis(READ_TIMEOUT)) + * .build(); + * } + * }; + * Source cassandraSource = new CassandraSource(clusterBuilder, + * Pojo.class, + * "select ... from KEYSPACE.TABLE ...;", + * () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + * + * DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), + * "CassandraSource"); + * }</pre> + */ +@PublicEvolving +public class CassandraSource<OUT> + implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> { + + public static final String CQL_PROHIBITTED_CLAUSES_REGEXP = + "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"; + private static final long serialVersionUID = 7773196541275567433L; Review Comment: serial version ids should start at 1. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { + + private final BigInteger start; + private final BigInteger end; + + private RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public static RingRange of(BigInteger start, BigInteger end) { + return new RingRange(start, end); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + /** + * Returns the size of this range. + * + * @return size of the range, max - range, in case of wrap + */ + BigInteger span(BigInteger ringSize) { + return (start.compareTo(end) >= 0) Review Comment: ```suggestion return isWrapping() ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { Review Comment: Should probably use an enum. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { + + private final BigInteger start; + private final BigInteger end; + + private RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public static RingRange of(BigInteger start, BigInteger end) { + return new RingRange(start, end); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + /** + * Returns the size of this range. + * + * @return size of the range, max - range, in case of wrap + */ + BigInteger span(BigInteger ringSize) { Review Comment: So why is this called span instead of "size"? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { + + private final BigInteger start; + private final BigInteger end; + + private RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public static RingRange of(BigInteger start, BigInteger end) { + return new RingRange(start, end); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + /** + * Returns the size of this range. + * + * @return size of the range, max - range, in case of wrap + */ + BigInteger span(BigInteger ringSize) { + return (start.compareTo(end) >= 0) + ? end.subtract(start).add(ringSize) Review Comment: ```suggestion ? ringSize.subtract(end).add(start) ``` This was easier for me to visualize. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.split.RingRange; + +import com.datastax.driver.core.ExecutionInfo; +import com.datastax.driver.core.Row; + +/** + * Wrapper for Cassandra {@link Row} that stores associated {@link RingRange} to be able to update + * split states. It also stores {@link ExecutionInfo} Cassandra statistics about the query execution + * that produced this row. + */ +public class CassandraRow { + + private final Row row; + private final RingRange associatedRingRange; + private final ExecutionInfo executionInfo; Review Comment: Who is actually using this? I see we pass it to the result set when creating the mapper; is it actually relevant there? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { + + private final BigInteger start; + private final BigInteger end; + + private RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public static RingRange of(BigInteger start, BigInteger end) { + return new RingRange(start, end); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + /** + * Returns the size of this range. + * + * @return size of the range, max - range, in case of wrap + */ + BigInteger span(BigInteger ringSize) { + return (start.compareTo(end) >= 0) + ? end.subtract(start).add(ringSize) + : end.subtract(start); + } + + /** @return true if the ringRange overlaps. Note that if start == end, then wrapping is true */ + public boolean isWrapping() { + return start.compareTo(end) >= 0; + } + + @Override + public String toString() { + return String.format("(%s,%s]", start.toString(), end.toString()); + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RingRange ringRange = (RingRange) o; + + if (getStart() != null + ? !getStart().equals(ringRange.getStart()) + : ringRange.getStart() != null) { + return false; + } + return getEnd() != null ? getEnd().equals(ringRange.getEnd()) : ringRange.getEnd() == null; + } + + @Override + public int hashCode() { + int result = getStart() != null ? getStart().hashCode() : 0; Review Comment: Are start/end actually nullable? If so I'd like to know why and various Nullable annotations would be missing. if not then hashcode/equals can be simplified. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} Review Comment: Why does it potentially create more splits than requested? Under what circumstances does that happen? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + + private final SplitEnumeratorContext<CassandraSplit> enumeratorContext; + private final CassandraEnumeratorState state; + private final Cluster cluster; + + public CassandraSplitEnumerator( + SplitEnumeratorContext<CassandraSplit> enumeratorContext, + CassandraEnumeratorState state, + ClusterBuilder clusterBuilder) { + this.enumeratorContext = enumeratorContext; + this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; + this.cluster = clusterBuilder.getCluster(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + assignUnprocessedSplitsToReader(subtaskId); + } + + @Override + public void start() { + // discover the splits and update unprocessed splits and then assign them. + // There is only an initial splits discovery, no periodic discovery. + enumeratorContext.callAsync( + this::discoverSplits, + (splits, throwable) -> { + LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); + state.addNewSplits(splits, enumeratorContext.currentParallelism()); + }); + } + + private List<CassandraSplit> discoverSplits() { + final int numberOfSplits = enumeratorContext.currentParallelism(); + final Metadata clusterMetadata = cluster.getMetadata(); + final String partitioner = clusterMetadata.getPartitioner(); + final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); + if (MURMUR3PARTITIONER.equals(partitioner)) { + LOG.info("Murmur3Partitioner detected, splitting"); + List<BigInteger> tokens = + clusterMetadata.getTokenRanges().stream() + .map( + tokenRange -> + new BigInteger( + tokenRange.getEnd().getValue().toString())) + .collect(Collectors.toList()); + return splitsGenerator.generateSplits(numberOfSplits, tokens); + } else { + // Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ + // see + // https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html + LOG.warn( + "The current Cassandra partitioner is {}, only Murmur3Partitioner is supported " + + "for splitting, using an single split", + partitioner); + return splitsGenerator.generateSplits(1, Collections.emptyList()); + } + } + + @Override + public void addSplitsBack(List<CassandraSplit> splits, int subtaskId) { + LOG.info("Add {} splits back to CassandraSplitEnumerator.", splits.size()); + state.addNewSplits(splits, enumeratorContext.currentParallelism()); + assignUnprocessedSplitsToReader(subtaskId); + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Adding reader {} to CassandraSplitEnumerator.", subtaskId); + assignUnprocessedSplitsToReader(subtaskId); + } + + private void assignUnprocessedSplitsToReader(int readerId) { + checkReaderRegistered(readerId); + + final Set<CassandraSplit> splitsForReader = state.getSplitsForReader(readerId); Review Comment: For something that is mostly targeted at batch workloads it seems strange to pre-assign splits to readers. Rather they should be fully distributed on demand to prevent stragglers. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** Serializer for {@link CassandraEnumeratorState}. */ +public class CassandraEnumeratorStateSerializer + implements SimpleVersionedSerializer<CassandraEnumeratorState> { + + public static final CassandraEnumeratorStateSerializer INSTANCE = + new CassandraEnumeratorStateSerializer(); + public static final int CURRENT_VERSION = 0; + + private CassandraEnumeratorStateSerializer() { // singleton + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(cassandraEnumeratorState); Review Comment: Don't use java serialization. Manually write the components that make of a split for easier migration in the future. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} + * splits. Each split can contain several token ranges in order to reduce the overhead of + * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they + * share the same replicas. + * + * @param totalSplitCount requested total amount of splits. This function may generate more + * splits. + * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring + * order. + * @return list containing at least {@code totalSplitCount} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { + if (totalSplitCount == 1) { + RingRange totalRingRange = RingRange.of(rangeMin, rangeMax); + return Collections.singletonList( + new CassandraSplit(Collections.singleton(totalRingRange))); + } + int tokenRangeCount = ringTokens.size(); + + List<RingRange> ringRanges = new ArrayList<>(); + for (int i = 0; i < tokenRangeCount; i++) { + BigInteger start = ringTokens.get(i); + BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); + + if (isNotInRange(start) || isNotInRange(stop)) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s) not in range of %s", start, stop, partitioner)); + } + if (start.equals(stop) && tokenRangeCount != 1) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s): two nodes have the same token", start, stop)); + } + + BigInteger rangeSize = stop.subtract(start); + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) { + // wrap around case + rangeSize = rangeSize.add(this.rangeSize); + } Review Comment: this is the third implementation for computing the size/span of a range. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); Review Comment: Where do these numbers come from? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { + // allow to interrupt the reading of splits as requested in the API + if (wakeup.get()) { + break; + } + if (!cassandraSplitState.isEmpty()) { + try { + final Set<RingRange> ringRanges = + cassandraSplitState.getUnprocessedRingRanges(); + final String cassandraSplitId = cassandraSplitState.getSplitId(); + + for (RingRange ringRange : ringRanges) { + Token startToken = + clusterMetadata.newToken(ringRange.getStart().toString()); + Token endToken = clusterMetadata.newToken(ringRange.getEnd().toString()); + if (ringRange.isWrapping()) { + // A wrapping range is one that overlaps from the end of the partitioner + // range and its + // start (ie : when the start token of the split is greater than the end + // token) + // We need to generate two queries here : one that goes from the start + // token to the end + // of + // the partitioner range, and the other from the start of the + // partitioner range to the + // end token of the split. + + addRecordsToOutput( + session.execute( + getLowestSplitQuery( + query, partitionKey, ringRange.getEnd())), + recordsBySplit, + cassandraSplitId, + ringRange); + addRecordsToOutput( + session.execute( + getHighestSplitQuery( + query, partitionKey, ringRange.getStart())), + recordsBySplit, + cassandraSplitId, + ringRange); + } else { + addRecordsToOutput( + session.execute( + preparedStatement + .bind() + .setToken(0, startToken) + .setToken(1, endToken)), + recordsBySplit, + cassandraSplitId, + ringRange); + } + cassandraSplitState.markRingRangeAsFinished(ringRange); + } + // put the already read split to finished splits + finishedSplits.add(cassandraSplitState.getSplitId()); + // for reentrant calls: if fetch is woken up, + // do not reprocess the already processed splits + unprocessedSplits.remove(cassandraSplitState); + } catch (Exception ex) { + LOG.error("Error while reading split ", ex); + } + } else { + finishedSplits.add(cassandraSplitState.getSplitId()); + } + } + return new RecordsBySplits<>(recordsBySplit, finishedSplits); + } + + private String getPartitionKey(Metadata clusterMetadata) { + Matcher queryMatcher = Pattern.compile(SELECT_REGEXP).matcher(query); + if (!queryMatcher.matches()) { + throw new IllegalStateException( + String.format( + "Failed to extract keyspace and table out of the provided query: %s", + query)); + } + String keyspace = queryMatcher.group(1); + String table = queryMatcher.group(2); + return clusterMetadata.getKeyspace(keyspace).getTable(table).getPartitionKey().stream() + .map(ColumnMetadata::getName) + .collect(Collectors.joining(",")); + } + + @Override + public void wakeUp() { + wakeup.compareAndSet(false, true); + } + + @Override + public void handleSplitsChanges(SplitsChange<CassandraSplit> splitsChanges) { + for (CassandraSplit cassandraSplit : splitsChanges.splits()) { + unprocessedSplits.add(cassandraSplit.toSplitState()); + } + } + + @VisibleForTesting + static String getHighestSplitQuery(String query, String partitionKey, BigInteger highest) { + return generateQuery( + query, partitionKey, highest, " (token(%s) >= %d) AND", " WHERE (token(%s) >= %d)"); + } + + @VisibleForTesting + static String getLowestSplitQuery(String query, String partitionKey, BigInteger lowest) { + return generateQuery( + query, partitionKey, lowest, " (token(%s) < %d) AND", " WHERE (token(%s) < %d)"); + } + + @VisibleForTesting + static String generateRangeQuery(String query, String partitionKey) { + return generateQuery( + query, + partitionKey, + null, + " (token(%s) >= ?) AND (token(%s) < ?) AND", + " WHERE (token(%s) >= ?) AND (token(%s) < ?)"); + } + + private static String generateQuery( + String query, + String partitionKey, + @Nullable BigInteger token, + String whereFilter, + String noWhereFilter) { + Matcher queryMatcher = Pattern.compile(SELECT_REGEXP).matcher(query); + if (!queryMatcher.matches()) { + throw new IllegalStateException( + String.format( + "Failed to extract keyspace and table out of the provided query: %s", + query)); + } + final int whereIndex = query.toLowerCase().indexOf("where"); + int insertionPoint; + String filter; + if (whereIndex != -1) { + insertionPoint = whereIndex + "where".length(); + filter = + (token == null) + ? String.format(whereFilter, partitionKey, partitionKey) + : String.format(whereFilter, partitionKey, token); + } else { + // end of keyspace.table + insertionPoint = queryMatcher.end(2); + filter = + (token == null) + ? String.format(noWhereFilter, partitionKey, partitionKey) + : String.format(noWhereFilter, partitionKey, token); + } + return String.format( + "%s%s%s", + query.substring(0, insertionPoint), filter, query.substring(insertionPoint)); + } + + /** + * This method populates the {@code Map<String, Collection<CassandraRow>> recordsBySplit} map + * that is used to create the {@link RecordsBySplits} that are output by the fetch method. It + * modifies its {@code output} parameter. + */ + private void addRecordsToOutput( + ResultSet resultSet, Review Comment: Why aren't we doing the row->pojo mapping here? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} + * splits. Each split can contain several token ranges in order to reduce the overhead of + * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they + * share the same replicas. + * + * @param totalSplitCount requested total amount of splits. This function may generate more + * splits. + * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring + * order. + * @return list containing at least {@code totalSplitCount} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { + if (totalSplitCount == 1) { + RingRange totalRingRange = RingRange.of(rangeMin, rangeMax); + return Collections.singletonList( + new CassandraSplit(Collections.singleton(totalRingRange))); + } + int tokenRangeCount = ringTokens.size(); + + List<RingRange> ringRanges = new ArrayList<>(); + for (int i = 0; i < tokenRangeCount; i++) { + BigInteger start = ringTokens.get(i); + BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); + + if (isNotInRange(start) || isNotInRange(stop)) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s) not in range of %s", start, stop, partitioner)); + } + if (start.equals(stop) && tokenRangeCount != 1) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s): two nodes have the same token", start, stop)); + } + + BigInteger rangeSize = stop.subtract(start); + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) { + // wrap around case + rangeSize = rangeSize.add(this.rangeSize); + } + + // the below, in essence, does this: + // splitCount = Maths.ceil((rangeSize / cluster range size) * totalSplitCount) + BigInteger[] splitCountAndRemainder = Review Comment: I'd appreciate some more comments as to what _all_ the code below this line does. My brain can't really comprehend it right now. It looks like we so far computed the token ranges, now we're splitting them (to get closer to the desired split count?) and then merge them again for some token requirement or something. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** Serializer for {@link CassandraSplit}. */ +public class CassandraSplitSerializer implements SimpleVersionedSerializer<CassandraSplit> { + + public static final CassandraSplitSerializer INSTANCE = new CassandraSplitSerializer(); + + public static final int CURRENT_VERSION = 0; + + private CassandraSplitSerializer() {} + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(CassandraSplit cassandraSplit) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(cassandraSplit); Review Comment: Don't use java serialization. Manually write the components that make of a split for easier migration in the future. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} + * splits. Each split can contain several token ranges in order to reduce the overhead of + * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they + * share the same replicas. + * + * @param totalSplitCount requested total amount of splits. This function may generate more + * splits. + * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring + * order. + * @return list containing at least {@code totalSplitCount} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { + if (totalSplitCount == 1) { + RingRange totalRingRange = RingRange.of(rangeMin, rangeMax); + return Collections.singletonList( + new CassandraSplit(Collections.singleton(totalRingRange))); + } + int tokenRangeCount = ringTokens.size(); + + List<RingRange> ringRanges = new ArrayList<>(); + for (int i = 0; i < tokenRangeCount; i++) { + BigInteger start = ringTokens.get(i); + BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); + + if (isNotInRange(start) || isNotInRange(stop)) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s) not in range of %s", start, stop, partitioner)); + } + if (start.equals(stop) && tokenRangeCount != 1) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s): two nodes have the same token", start, stop)); + } + + BigInteger rangeSize = stop.subtract(start); + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) { + // wrap around case + rangeSize = rangeSize.add(this.rangeSize); + } + + // the below, in essence, does this: + // splitCount = Maths.ceil((rangeSize / cluster range size) * totalSplitCount) + BigInteger[] splitCountAndRemainder = + rangeSize + .multiply(BigInteger.valueOf(totalSplitCount)) + .divideAndRemainder(this.rangeSize); + + int splitCount = + splitCountAndRemainder[0].intValue() + + (splitCountAndRemainder[1].equals(BigInteger.ZERO) ? 0 : 1); + + LOG.debug("Dividing token range [{},{}) into {} splits", start, stop, splitCount); + + // Make BigInteger list of all the endpoints for the splits, including both start and + // stop + List<BigInteger> endpointTokens = new ArrayList<>(); + for (int j = 0; j <= splitCount; j++) { + BigInteger offset = + rangeSize + .multiply(BigInteger.valueOf(j)) + .divide(BigInteger.valueOf(splitCount)); + BigInteger token = start.add(offset); + if (token.compareTo(rangeMax) > 0) { + token = token.subtract(this.rangeSize); + } + // Long.MIN_VALUE is not a valid token and has to be silently incremented. + // See https://issues.apache.org/jira/browse/CASSANDRA-14684 + endpointTokens.add( + token.equals(BigInteger.valueOf(Long.MIN_VALUE)) + ? token.add(BigInteger.ONE) + : token); + } + + // Append the ringRanges between the endpoints + for (int j = 0; j < splitCount; j++) { + ringRanges.add(RingRange.of(endpointTokens.get(j), endpointTokens.get(j + 1))); + LOG.debug( + "Split #{}: [{},{})", + j + 1, + endpointTokens.get(j), + endpointTokens.get(j + 1)); + } + } + + BigInteger total = BigInteger.ZERO; + for (RingRange split : ringRanges) { + BigInteger size = split.span(rangeSize); + total = total.add(size); + } + if (!total.equals(rangeSize)) { + throw new RuntimeException( + "Some tokens are missing from the splits. This should not happen."); + } + return coalesceRingRanges(getTargetSplitSize(totalSplitCount), ringRanges); + } + + private boolean isNotInRange(BigInteger token) { + return token.compareTo(rangeMin) < 0 || token.compareTo(rangeMax) > 0; + } + + private List<CassandraSplit> coalesceRingRanges( + BigInteger targetSplitSize, List<RingRange> ringRanges) { + List<CassandraSplit> coalescedSplits = new ArrayList<>(); + List<RingRange> tokenRangesForCurrentSplit = new ArrayList<>(); + BigInteger tokenCount = BigInteger.ZERO; + + for (RingRange tokenRange : ringRanges) { + if (tokenRange.span(rangeSize).add(tokenCount).compareTo(targetSplitSize) > 0 + && !tokenRangesForCurrentSplit.isEmpty()) { + // enough tokens in that segment Review Comment: enough tokens _for what_? Can you explain a bit what tokens are? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set Review Comment: explain _why_ we need it ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s by generating {@link RingRange}s based on Cassandra + * cluster partitioner and Flink source parallelism. + */ +public final class SplitsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final String partitioner; + private final BigInteger rangeMin; + private final BigInteger rangeMax; + private final BigInteger rangeSize; + + public SplitsGenerator(String partitioner) { + this.partitioner = partitioner; + rangeMin = getRangeMin(); + rangeMax = getRangeMax(); + rangeSize = getRangeSize(); + } + + private BigInteger getRangeMin() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.ZERO; + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).negate(); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeMax() { + if (partitioner.endsWith("RandomPartitioner")) { + return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE); + } else if (partitioner.endsWith("Murmur3Partitioner")) { + return BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE); + } else { + throw new UnsupportedOperationException( + "Unsupported partitioner. " + "Only Random and Murmur3 are supported"); + } + } + + private BigInteger getRangeSize() { + return rangeMax.subtract(rangeMin).add(BigInteger.ONE); + } + + /** + * Given properly ordered list of Cassandra tokens, compute at least {@code totalSplitCount} + * splits. Each split can contain several token ranges in order to reduce the overhead of + * Cassandra vnodes. Currently, token range grouping is not smart and doesn't check if they + * share the same replicas. + * + * @param totalSplitCount requested total amount of splits. This function may generate more + * splits. + * @param ringTokens list of all start tokens in Cassandra cluster. They have to be in ring + * order. + * @return list containing at least {@code totalSplitCount} CassandraSplits. + */ + public List<CassandraSplit> generateSplits(long totalSplitCount, List<BigInteger> ringTokens) { + if (totalSplitCount == 1) { + RingRange totalRingRange = RingRange.of(rangeMin, rangeMax); + return Collections.singletonList( + new CassandraSplit(Collections.singleton(totalRingRange))); + } + int tokenRangeCount = ringTokens.size(); + + List<RingRange> ringRanges = new ArrayList<>(); + for (int i = 0; i < tokenRangeCount; i++) { + BigInteger start = ringTokens.get(i); + BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); + + if (isNotInRange(start) || isNotInRange(stop)) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s) not in range of %s", start, stop, partitioner)); + } + if (start.equals(stop) && tokenRangeCount != 1) { + throw new RuntimeException( + String.format( + "Tokens (%s,%s): two nodes have the same token", start, stop)); + } + + BigInteger rangeSize = stop.subtract(start); + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) { + // wrap around case + rangeSize = rangeSize.add(this.rangeSize); + } + + // the below, in essence, does this: + // splitCount = Maths.ceil((rangeSize / cluster range size) * totalSplitCount) + BigInteger[] splitCountAndRemainder = + rangeSize + .multiply(BigInteger.valueOf(totalSplitCount)) + .divideAndRemainder(this.rangeSize); + + int splitCount = + splitCountAndRemainder[0].intValue() + + (splitCountAndRemainder[1].equals(BigInteger.ZERO) ? 0 : 1); + + LOG.debug("Dividing token range [{},{}) into {} splits", start, stop, splitCount); + + // Make BigInteger list of all the endpoints for the splits, including both start and + // stop + List<BigInteger> endpointTokens = new ArrayList<>(); + for (int j = 0; j <= splitCount; j++) { + BigInteger offset = + rangeSize + .multiply(BigInteger.valueOf(j)) + .divide(BigInteger.valueOf(splitCount)); + BigInteger token = start.add(offset); + if (token.compareTo(rangeMax) > 0) { + token = token.subtract(this.rangeSize); + } + // Long.MIN_VALUE is not a valid token and has to be silently incremented. + // See https://issues.apache.org/jira/browse/CASSANDRA-14684 + endpointTokens.add( + token.equals(BigInteger.valueOf(Long.MIN_VALUE)) + ? token.add(BigInteger.ONE) + : token); + } + + // Append the ringRanges between the endpoints + for (int j = 0; j < splitCount; j++) { + ringRanges.add(RingRange.of(endpointTokens.get(j), endpointTokens.get(j + 1))); + LOG.debug( + "Split #{}: [{},{})", + j + 1, + endpointTokens.get(j), + endpointTokens.get(j + 1)); + } + } + + BigInteger total = BigInteger.ZERO; + for (RingRange split : ringRanges) { + BigInteger size = split.span(rangeSize); + total = total.add(size); + } + if (!total.equals(rangeSize)) { + throw new RuntimeException( + "Some tokens are missing from the splits. This should not happen."); + } + return coalesceRingRanges(getTargetSplitSize(totalSplitCount), ringRanges); + } + + private boolean isNotInRange(BigInteger token) { + return token.compareTo(rangeMin) < 0 || token.compareTo(rangeMax) > 0; + } + + private List<CassandraSplit> coalesceRingRanges( + BigInteger targetSplitSize, List<RingRange> ringRanges) { + List<CassandraSplit> coalescedSplits = new ArrayList<>(); + List<RingRange> tokenRangesForCurrentSplit = new ArrayList<>(); + BigInteger tokenCount = BigInteger.ZERO; + + for (RingRange tokenRange : ringRanges) { + if (tokenRange.span(rangeSize).add(tokenCount).compareTo(targetSplitSize) > 0 + && !tokenRangesForCurrentSplit.isEmpty()) { + // enough tokens in that segment + LOG.debug( + "Got enough tokens for one split ({}) : {}", + tokenCount, + tokenRangesForCurrentSplit); + coalescedSplits.add(new CassandraSplit(new HashSet<>(tokenRangesForCurrentSplit))); + tokenRangesForCurrentSplit = new ArrayList<>(); + tokenCount = BigInteger.ZERO; + } + + tokenCount = tokenCount.add(tokenRange.span(rangeSize)); + tokenRangesForCurrentSplit.add(tokenRange); + } + + if (!tokenRangesForCurrentSplit.isEmpty()) { + coalescedSplits.add(new CassandraSplit(new HashSet<>(tokenRangesForCurrentSplit))); + } + return coalescedSplits; + } + + private BigInteger getTargetSplitSize(long splitCount) { + return rangeMax.subtract(rangeMin).divide(BigInteger.valueOf(splitCount)); Review Comment: Are tokens evenly distributed in the range or do they depend on the number of elements within a range? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ExecutionInfo; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * {@link RecordEmitter} that converts the {@link CassandraRow} read by the {@link + * CassandraSplitReader} to specified POJO and output it while updating splits state. This class + * uses the Cassandra driver mapper to map the row to the POJO. + * + * @param <OUT> type of POJO record to output + */ +public class CassandraRecordEmitter<OUT> + implements RecordEmitter<CassandraRow, OUT, CassandraSplitState> { + + private final Mapper<OUT> mapper; + + public CassandraRecordEmitter( + Class<OUT> pojoClass, ClusterBuilder clusterBuilder, MapperOptions mapperOptions) { + // session and cluster are managed at the SplitReader level. So we need to create one + // locally here just to me able to create the mapper. + final Cluster cluster = clusterBuilder.getCluster(); + final Session session = cluster.connect(); Review Comment: this seems sketchy. Have you considered just having the user provide a mapping function from row -> pojo? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { + // allow to interrupt the reading of splits as requested in the API + if (wakeup.get()) { + break; + } + if (!cassandraSplitState.isEmpty()) { + try { + final Set<RingRange> ringRanges = + cassandraSplitState.getUnprocessedRingRanges(); + final String cassandraSplitId = cassandraSplitState.getSplitId(); + + for (RingRange ringRange : ringRanges) { + Token startToken = + clusterMetadata.newToken(ringRange.getStart().toString()); + Token endToken = clusterMetadata.newToken(ringRange.getEnd().toString()); + if (ringRange.isWrapping()) { + // A wrapping range is one that overlaps from the end of the partitioner + // range and its + // start (ie : when the start token of the split is greater than the end + // token) + // We need to generate two queries here : one that goes from the start + // token to the end + // of + // the partitioner range, and the other from the start of the + // partitioner range to the + // end token of the split. + + addRecordsToOutput( + session.execute( + getLowestSplitQuery( + query, partitionKey, ringRange.getEnd())), + recordsBySplit, + cassandraSplitId, + ringRange); + addRecordsToOutput( + session.execute( + getHighestSplitQuery( + query, partitionKey, ringRange.getStart())), + recordsBySplit, + cassandraSplitId, + ringRange); + } else { + addRecordsToOutput( + session.execute( + preparedStatement + .bind() + .setToken(0, startToken) + .setToken(1, endToken)), + recordsBySplit, + cassandraSplitId, + ringRange); + } + cassandraSplitState.markRingRangeAsFinished(ringRange); + } + // put the already read split to finished splits + finishedSplits.add(cassandraSplitState.getSplitId()); + // for reentrant calls: if fetch is woken up, + // do not reprocess the already processed splits + unprocessedSplits.remove(cassandraSplitState); + } catch (Exception ex) { + LOG.error("Error while reading split ", ex); + } + } else { + finishedSplits.add(cassandraSplitState.getSplitId()); + } + } + return new RecordsBySplits<>(recordsBySplit, finishedSplits); + } + + private String getPartitionKey(Metadata clusterMetadata) { + Matcher queryMatcher = Pattern.compile(SELECT_REGEXP).matcher(query); + if (!queryMatcher.matches()) { + throw new IllegalStateException( + String.format( + "Failed to extract keyspace and table out of the provided query: %s", + query)); + } + String keyspace = queryMatcher.group(1); + String table = queryMatcher.group(2); + return clusterMetadata.getKeyspace(keyspace).getTable(table).getPartitionKey().stream() + .map(ColumnMetadata::getName) + .collect(Collectors.joining(",")); + } + + @Override + public void wakeUp() { + wakeup.compareAndSet(false, true); + } + + @Override + public void handleSplitsChanges(SplitsChange<CassandraSplit> splitsChanges) { + for (CassandraSplit cassandraSplit : splitsChanges.splits()) { + unprocessedSplits.add(cassandraSplit.toSplitState()); + } + } + + @VisibleForTesting + static String getHighestSplitQuery(String query, String partitionKey, BigInteger highest) { + return generateQuery( + query, partitionKey, highest, " (token(%s) >= %d) AND", " WHERE (token(%s) >= %d)"); + } + + @VisibleForTesting + static String getLowestSplitQuery(String query, String partitionKey, BigInteger lowest) { + return generateQuery( + query, partitionKey, lowest, " (token(%s) < %d) AND", " WHERE (token(%s) < %d)"); + } + + @VisibleForTesting + static String generateRangeQuery(String query, String partitionKey) { + return generateQuery( + query, + partitionKey, + null, + " (token(%s) >= ?) AND (token(%s) < ?) AND", + " WHERE (token(%s) >= ?) AND (token(%s) < ?)"); + } + + private static String generateQuery( Review Comment: can we get some before/after examples for this. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { + // allow to interrupt the reading of splits as requested in the API + if (wakeup.get()) { Review Comment: Which calls in this method are blocking? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplitState> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + + public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { + // need a thread safe set + this.unprocessedSplits = ConcurrentHashMap.newKeySet(); + this.query = query; + cluster = clusterBuilder.getCluster(); + session = cluster.connect(); + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + Metadata clusterMetadata = cluster.getMetadata(); + + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + // Set wakeup to false to start consuming. + wakeup.compareAndSet(true, false); + for (CassandraSplitState cassandraSplitState : unprocessedSplits) { + // allow to interrupt the reading of splits as requested in the API + if (wakeup.get()) { + break; + } + if (!cassandraSplitState.isEmpty()) { + try { + final Set<RingRange> ringRanges = + cassandraSplitState.getUnprocessedRingRanges(); + final String cassandraSplitId = cassandraSplitState.getSplitId(); + + for (RingRange ringRange : ringRanges) { + Token startToken = + clusterMetadata.newToken(ringRange.getStart().toString()); + Token endToken = clusterMetadata.newToken(ringRange.getEnd().toString()); + if (ringRange.isWrapping()) { + // A wrapping range is one that overlaps from the end of the partitioner + // range and its + // start (ie : when the start token of the split is greater than the end + // token) + // We need to generate two queries here : one that goes from the start + // token to the end + // of + // the partitioner range, and the other from the start of the + // partitioner range to the + // end token of the split. Review Comment: Why not model this as 2 separate ranges from the beginning? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; Review Comment: Do we really need to hard-code the class here? If we can't access the class to infer the name, would it make sense to just check whether the returned name _contains_ "Murmur3Partitioner"? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + + private final SplitEnumeratorContext<CassandraSplit> enumeratorContext; + private final CassandraEnumeratorState state; + private final Cluster cluster; + + public CassandraSplitEnumerator( + SplitEnumeratorContext<CassandraSplit> enumeratorContext, + CassandraEnumeratorState state, + ClusterBuilder clusterBuilder) { + this.enumeratorContext = enumeratorContext; + this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; + this.cluster = clusterBuilder.getCluster(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + assignUnprocessedSplitsToReader(subtaskId); + } + + @Override + public void start() { + // discover the splits and update unprocessed splits and then assign them. + // There is only an initial splits discovery, no periodic discovery. + enumeratorContext.callAsync( + this::discoverSplits, + (splits, throwable) -> { + LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); + state.addNewSplits(splits, enumeratorContext.currentParallelism()); + }); + } + + private List<CassandraSplit> discoverSplits() { + final int numberOfSplits = enumeratorContext.currentParallelism(); + final Metadata clusterMetadata = cluster.getMetadata(); + final String partitioner = clusterMetadata.getPartitioner(); + final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); + if (MURMUR3PARTITIONER.equals(partitioner)) { + LOG.info("Murmur3Partitioner detected, splitting"); + List<BigInteger> tokens = + clusterMetadata.getTokenRanges().stream() + .map( + tokenRange -> + new BigInteger( + tokenRange.getEnd().getValue().toString())) + .collect(Collectors.toList()); + return splitsGenerator.generateSplits(numberOfSplits, tokens); + } else { + // Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ + // see + // https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html + LOG.warn( + "The current Cassandra partitioner is {}, only Murmur3Partitioner is supported " Review Comment: What about the random partitioner? Shouldn't it be possible to create multiple splits? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + + private final SplitEnumeratorContext<CassandraSplit> enumeratorContext; + private final CassandraEnumeratorState state; + private final Cluster cluster; + + public CassandraSplitEnumerator( + SplitEnumeratorContext<CassandraSplit> enumeratorContext, + CassandraEnumeratorState state, + ClusterBuilder clusterBuilder) { + this.enumeratorContext = enumeratorContext; + this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; + this.cluster = clusterBuilder.getCluster(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + assignUnprocessedSplitsToReader(subtaskId); + } + + @Override + public void start() { + // discover the splits and update unprocessed splits and then assign them. + // There is only an initial splits discovery, no periodic discovery. + enumeratorContext.callAsync( + this::discoverSplits, + (splits, throwable) -> { + LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); + state.addNewSplits(splits, enumeratorContext.currentParallelism()); + }); + } + + private List<CassandraSplit> discoverSplits() { + final int numberOfSplits = enumeratorContext.currentParallelism(); + final Metadata clusterMetadata = cluster.getMetadata(); + final String partitioner = clusterMetadata.getPartitioner(); + final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); + if (MURMUR3PARTITIONER.equals(partitioner)) { + LOG.info("Murmur3Partitioner detected, splitting"); + List<BigInteger> tokens = + clusterMetadata.getTokenRanges().stream() + .map( + tokenRange -> + new BigInteger( + tokenRange.getEnd().getValue().toString())) + .collect(Collectors.toList()); + return splitsGenerator.generateSplits(numberOfSplits, tokens); + } else { + // Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ + // see + // https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html + LOG.warn( + "The current Cassandra partitioner is {}, only Murmur3Partitioner is supported " + + "for splitting, using an single split", Review Comment: ```suggestion + "for splitting, using a single split", ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator; +import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReader; +import org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bounded source to read from Cassandra and return a collection of entities as {@code + * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code + * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing annotations (as described + * in <a + * href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/"> + * Cassandra object mapper</a>). + * + * <p>To use it, do the following: + * + * <pre>{@code + * ClusterBuilder clusterBuilder = new ClusterBuilder() { + * @Override + * protected Cluster buildCluster(Cluster.Builder builder) { + * return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT)) + * .withQueryOptions(new QueryOptions().setConsistencyLevel(CL)) + * .withSocketOptions(new SocketOptions() + * .setConnectTimeoutMillis(CONNECT_TIMEOUT) + * .setReadTimeoutMillis(READ_TIMEOUT)) + * .build(); + * } + * }; + * Source cassandraSource = new CassandraSource(clusterBuilder, + * Pojo.class, + * "select ... from KEYSPACE.TABLE ...;", + * () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + * + * DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), + * "CassandraSource"); + * }</pre> + */ +@PublicEvolving +public class CassandraSource<OUT> + implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> { + + public static final String CQL_PROHIBITTED_CLAUSES_REGEXP = + "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"; + private static final long serialVersionUID = 7773196541275567433L; + + private final ClusterBuilder clusterBuilder; + private final Class<OUT> pojoClass; + private final String query; + private final MapperOptions mapperOptions; + + public CassandraSource( + ClusterBuilder clusterBuilder, + Class<OUT> pojoClass, + String query, + MapperOptions mapperOptions) { + checkNotNull(clusterBuilder, "ClusterBuilder required but not provided"); + checkNotNull(pojoClass, "POJO class required but not provided"); + checkQueryValidity(query); + this.clusterBuilder = clusterBuilder; + ClosureCleaner.clean(clusterBuilder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.pojoClass = pojoClass; + this.query = query; + this.mapperOptions = mapperOptions; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Internal + @Override + public SourceReader<OUT, CassandraSplit> createReader(SourceReaderContext readerContext) { + return new CassandraSourceReader<>( + readerContext, clusterBuilder, pojoClass, query, mapperOptions); + } + + @Internal + @Override + public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> createEnumerator( + SplitEnumeratorContext<CassandraSplit> enumContext) { + return new CassandraSplitEnumerator(enumContext, null, clusterBuilder); + } + + @Internal + @Override + public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<CassandraSplit> enumContext, + CassandraEnumeratorState enumCheckpoint) { + return new CassandraSplitEnumerator(enumContext, enumCheckpoint, clusterBuilder); + } + + @Internal + @Override + public SimpleVersionedSerializer<CassandraSplit> getSplitSerializer() { + return CassandraSplitSerializer.INSTANCE; + } + + @Internal + @Override + public SimpleVersionedSerializer<CassandraEnumeratorState> getEnumeratorCheckpointSerializer() { + return CassandraEnumeratorStateSerializer.INSTANCE; + } + + @Override + public TypeInformation<OUT> getProducedType() { + return TypeInformation.of(pojoClass); + } + + @VisibleForTesting + public static void checkQueryValidity(String query) { + checkNotNull(query, "query required but not provided"); + checkState( + query.matches(CassandraSplitReader.SELECT_REGEXP), + "query must be of the form select ... from keyspace.table ...;"); + checkState( + !query.matches(CQL_PROHIBITTED_CLAUSES_REGEXP), + "query must not contain aggregate or order clauses because they will be done per split. " + + "So they will be incorrect after merging the splits"); Review Comment: users wont understand why splits are being merged, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
