fapaul commented on code in PR #19588: URL: https://github.com/apache/flink/pull/19588#discussion_r863672111
########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.NumberSequenceIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.util.InstantiationUtil.serializeObject; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +@Public +public class GeneratorSource<OUT> + implements Source<OUT, NumberSequenceSplit<OUT>, Collection<NumberSequenceSplit<OUT>>>, + ResultTypeQueryable<OUT> { + + private static final long serialVersionUID = 1L; + + /** The end number in the sequence, inclusive. */ + private final long count; + + private final TypeInformation<OUT> typeInfo; + + public final MapFunction<Long, OUT> mapFunction; + + public final Boundedness boundedness; + + public long getCount() { + return count; + } + + private GeneratorSource( + MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) { + this.mapFunction = checkNotNull(mapFunction); + checkArgument(count > 0, "count must be > 0"); + this.count = count; + this.typeInfo = typeInfo; + boundedness = + count == Long.MAX_VALUE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; + } + + public static <OUT> GeneratorSource<OUT> from( + MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) { Review Comment: I would not use `MapFunction` here and rather the normal java `Function` ########## flink-core/src/main/java/org/apache/flink/util/NewSplittableIterator.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * Abstract base class for iterators that can split themselves into multiple disjoint iterators. The + * union of these iterators returns the original iterator values. + * + * @param <T> The type of elements returned by the iterator. + */ +@Public +public abstract class NewSplittableIterator<T> implements Iterator<T>, Serializable { Review Comment: The name is not perfect ;) ########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java: ########## @@ -46,25 +46,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -/** - * A data source that produces a sequence of numbers (longs). This source is useful for testing and - * for cases that just need a stream of N events of any kind. - * - * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel - * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is - * limited to one, this will produce one sequence in order. - * - * <p>This source is always bounded. For very long sequences (for example over the entire domain of - * long integer values), user may want to consider executing the application in a streaming manner, - * because, despite the fact that the produced stream is bounded, the end bound is pretty far away. - */ @Public -public class NumberSequenceSource - implements Source< - Long, - NumberSequenceSource.NumberSequenceSplit, - Collection<NumberSequenceSource.NumberSequenceSplit>>, - ResultTypeQueryable<Long> { +public class GeneratorSource<OUT> Review Comment: Just FYI: you need to keep the `NumberSequenceSource` because it is `Public` and can only be changed in a breaking way with a new major release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
