zentol commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1097492206
########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * <p>Samples the collection to generate partitions. + * + * <p>Uses the average document size to split the collection into average sized chunks + * + * <p>The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * <ul> + * <li>scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * <li>scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. + * </ul> + */ +@Internal +public class MongoSampleSplitter { + + private static final Logger LOG = LoggerFactory.getLogger(MongoSampleSplitter.class); + + public static final MongoSampleSplitter INSTANCE = new MongoSampleSplitter(); + + private MongoSampleSplitter() {} + + public Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) { + MongoReadOptions readOptions = splitContext.getReadOptions(); + MongoNamespace namespace = splitContext.getMongoNamespace(); + + long count = splitContext.getCount(); + long partitionSizeInBytes = readOptions.getPartitionSize().getBytes(); + int samplesPerPartition = readOptions.getSamplesPerPartition(); + + long avgObjSizeInBytes = splitContext.getAvgObjSize(); + if (avgObjSizeInBytes == 0L) { + LOG.info( + "{} seems to be an empty collection, Returning a single partition.", namespace); + return MongoSingleSplitter.INSTANCE.split(splitContext); + } + + long numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes; + if (numDocumentsPerPartition >= count) { + LOG.info( + "Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.", + count, + numDocumentsPerPartition); + return MongoSingleSplitter.INSTANCE.split(splitContext); + } + + int numberOfSamples = + (int) Math.ceil((samplesPerPartition * count * 1.0d) / numDocumentsPerPartition); Review Comment: ```suggestion int numberOfPartitions = (int) Math.ceil(count * 1.0d / numDocumentsPerPartition); int numberOfSamples = samplesPerPartition * numbersOfPartitions; ``` This made it easier to understand for me. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * <p>Samples the collection to generate partitions. + * + * <p>Uses the average document size to split the collection into average sized chunks + * + * <p>The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * <ul> + * <li>scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * <li>scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. + * </ul> + */ +@Internal +public class MongoSampleSplitter { + + private static final Logger LOG = LoggerFactory.getLogger(MongoSampleSplitter.class); + + public static final MongoSampleSplitter INSTANCE = new MongoSampleSplitter(); + + private MongoSampleSplitter() {} + + public Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) { + MongoReadOptions readOptions = splitContext.getReadOptions(); + MongoNamespace namespace = splitContext.getMongoNamespace(); + + long count = splitContext.getCount(); Review Comment: ```suggestion long totalNumDocuments = splitContext.getCount(); ``` ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * <p>Samples the collection to generate partitions. + * + * <p>Uses the average document size to split the collection into average sized chunks + * + * <p>The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * <ul> + * <li>scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * <li>scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. + * </ul> + */ +@Internal +public class MongoSampleSplitter { Review Comment: I'd love to see a test for this splitter. It's currently too easy to do a refactoring that changes the output by accident. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * <p>Samples the collection to generate partitions. + * + * <p>Uses the average document size to split the collection into average sized chunks + * + * <p>The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * <ul> + * <li>scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * <li>scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. + * </ul> + */ +@Internal +public class MongoSampleSplitter { + + private static final Logger LOG = LoggerFactory.getLogger(MongoSampleSplitter.class); + + public static final MongoSampleSplitter INSTANCE = new MongoSampleSplitter(); + + private MongoSampleSplitter() {} + + public Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) { + MongoReadOptions readOptions = splitContext.getReadOptions(); + MongoNamespace namespace = splitContext.getMongoNamespace(); + + long count = splitContext.getCount(); + long partitionSizeInBytes = readOptions.getPartitionSize().getBytes(); + int samplesPerPartition = readOptions.getSamplesPerPartition(); + + long avgObjSizeInBytes = splitContext.getAvgObjSize(); + if (avgObjSizeInBytes == 0L) { + LOG.info( + "{} seems to be an empty collection, Returning a single partition.", namespace); + return MongoSingleSplitter.INSTANCE.split(splitContext); + } + + long numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes; + if (numDocumentsPerPartition >= count) { + LOG.info( + "Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.", + count, + numDocumentsPerPartition); + return MongoSingleSplitter.INSTANCE.split(splitContext); + } + + int numberOfSamples = + (int) Math.ceil((samplesPerPartition * count * 1.0d) / numDocumentsPerPartition); + + List<BsonDocument> samples = + splitContext + .getMongoCollection() + .aggregate( + Arrays.asList( + Aggregates.sample(numberOfSamples), + Aggregates.project(Projections.include(ID_FIELD)), + Aggregates.sort(Sorts.ascending(ID_FIELD)))) + .allowDiskUse(true) + .into(new ArrayList<>()); + + List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(); + BsonDocument partitionStart = new BsonDocument(ID_FIELD, BSON_MIN_KEY); + int splitNum = 0; + for (int i = 0; i < samples.size(); i++) { + if (i % samplesPerPartition == 0 || i == samples.size() - 1) { + sourceSplits.add( + createSplit(namespace, splitNum++, partitionStart, samples.get(i))); + partitionStart = samples.get(i); + } + } Review Comment: ```suggestion BsonDocument partitionStart = samples.get(0); int splitNum = 0; for (int i = samplesPerPartition - 1; i < samples.size(); i+=samplesPerPartition) { sourceSplits.add( createSplit(namespace, splitNum++, partitionStart, samples.get(i))); partitionStart = samples.get(i); } ``` Ideally this is how the loop would look like. We just have a list of points and split it into `numPartition` blocks. No need for special handling of MIN/MAX or to explicitly complete the right bound. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
