aljoscha commented on a change in pull request #13512:
URL: https://github.com/apache/flink/pull/13512#discussion_r497346029



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code SplitEnumerator} for iterator sources. Simply takes the pre-split 
set of splits and assigns

Review comment:
       I would use `{@link SplitEnumerator}` because it will create links and 
allows following them in the IDE.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces a sequence of number (longs).

Review comment:
       ```suggestion
    * A data source that produces a sequence of numbers (longs).
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+/**
+ * A simple Source Event indicating that there is no split available for the 
reader (any more).

Review comment:
       ```suggestion
    * A {@link SourceEvent} indicating that there is no split available for the 
reader (any more).
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Iterator;
+
+/**
+ * A source split that represents a sequence of elements captured in an 
iterator.

Review comment:
       ```suggestion
    * A {@link SourceSplit} that represents a sequence of elements captured in 
an iterator.
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An {@link SourceReader} that returns the values of an iterator, supplied 
via an

Review comment:
       ```suggestion
    * A {@link SourceReader} that returns the values of an iterator, supplied 
via an
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An {@link SourceReader} that returns the values of an iterator, supplied 
via an
+ * {@link IteratorSourceSplit}.
+ *
+ * <p>The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and turning
+ * it back into a split for checkpointing.
+ *
+ * @param <E> The type of events returned by the reader.
+ * @param <IterT> The type of the iterator that produces the events. This type 
exists to make the
+ *                 conversion between iterator and {@code IteratorSourceSplit} 
type safe.
+ * @param <SplitT> The concrete type of the {@code IteratorSourceSplit} that 
creates and converts the
+ *                 iterator that produces this reader's elements.
+ */
+public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+               implements SourceReader<E, SplitT> {
+
+       /** The context for this reader, to communicate with the enumerator. */
+       private final SourceReaderContext context;
+
+       /** The availability future. This reader is available as soon as a 
split is assigned. */
+       private final CompletableFuture<Void> availability;
+
+       /** The iterator producing data. Non-null after a split has been 
assigned.
+       * This field is null or non-null always together with the {@link 
#currentSplit} field. */
+       @Nullable
+       private IterT iterator;
+
+       /** The split whose data we return. Non-null after a split has been 
assigned.
+        * This field is null or non-null always together with the {@link 
#iterator} field. */
+       @Nullable
+       private SplitT currentSplit;
+
+       /** The remaining splits. Null means no splits have yet been assigned. 
*/
+       @Nullable
+       private Queue<SplitT> remainingSplits;
+
+       public IteratorSourceReader(SourceReaderContext context) {
+               this.context = checkNotNull(context);
+               this.availability = new CompletableFuture<>();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start() {
+               // request a split only if we did not get one during restore
+               if (iterator == null) {
+                       context.sendSourceEventToCoordinator(new 
SplitRequestEvent());
+               }
+       }
+
+       @Override
+       public InputStatus pollNext(ReaderOutput<E> output) {
+               if (iterator != null && iterator.hasNext()) {
+                       output.collect(iterator.next());
+                       return InputStatus.MORE_AVAILABLE;
+               }
+               else if (remainingSplits == null) {

Review comment:
       nit: the newline style for the `if-else` branches is inconsistent here

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces a sequence of number (longs).
+ * This source is useful for testing and for cases that just need a stream of 
N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers. Each sub-sequence will be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order.
+ *
+ * <p>This source is always bounded. For very long sequences (for example over 
the entire domain
+ * of long integer values), user may want to consider executing the 
application in a streaming manner,
+ * because, despite the fact that the produced stream is bounded, the end 
bound is pretty far away.
+ */
+public class NumberSequenceSource implements
+               Source<Long, NumberSequenceSource.NumberSequenceSplit, 
Collection<NumberSequenceSource.NumberSequenceSplit>>,
+               ResultTypeQueryable<Long> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The starting number in the sequence, inclusive. */
+       private final long from;
+
+       /** The end number in the sequence, inclusive. */
+       private final long to;
+
+       /**
+        * Creates a new NumberSequenceSource that produces parallel sequences 
covering the range
+        * 'from' to 'to' (both boundaries are inclusive).
+        */

Review comment:
       ```suggestion
        /**
         * Creates a new {@code NumberSequenceSource} that produces parallel 
sequences covering the range
         * {@code from} to {@code to} (both boundaries are inclusive).
         */
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An {@link SourceReader} that returns the values of an iterator, supplied 
via an
+ * {@link IteratorSourceSplit}.
+ *
+ * <p>The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and turning
+ * it back into a split for checkpointing.
+ *
+ * @param <E> The type of events returned by the reader.
+ * @param <IterT> The type of the iterator that produces the events. This type 
exists to make the
+ *                 conversion between iterator and {@code IteratorSourceSplit} 
type safe.
+ * @param <SplitT> The concrete type of the {@code IteratorSourceSplit} that 
creates and converts the
+ *                 iterator that produces this reader's elements.
+ */
+public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+               implements SourceReader<E, SplitT> {
+
+       /** The context for this reader, to communicate with the enumerator. */
+       private final SourceReaderContext context;
+
+       /** The availability future. This reader is available as soon as a 
split is assigned. */
+       private final CompletableFuture<Void> availability;
+
+       /** The iterator producing data. Non-null after a split has been 
assigned.
+       * This field is null or non-null always together with the {@link 
#currentSplit} field. */
+       @Nullable
+       private IterT iterator;
+
+       /** The split whose data we return. Non-null after a split has been 
assigned.

Review comment:
       nit: broken formatting

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+/**
+ * A {@code SourceEvent} representing the request for a split, typically sent 
from the

Review comment:
       ```suggestion
    * A {@link SourceEvent} representing the request for a split, typically 
sent from the
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An {@link SourceReader} that returns the values of an iterator, supplied 
via an
+ * {@link IteratorSourceSplit}.
+ *
+ * <p>The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and turning
+ * it back into a split for checkpointing.
+ *
+ * @param <E> The type of events returned by the reader.
+ * @param <IterT> The type of the iterator that produces the events. This type 
exists to make the
+ *                 conversion between iterator and {@code IteratorSourceSplit} 
type safe.
+ * @param <SplitT> The concrete type of the {@code IteratorSourceSplit} that 
creates and converts the
+ *                 iterator that produces this reader's elements.
+ */
+public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+               implements SourceReader<E, SplitT> {
+
+       /** The context for this reader, to communicate with the enumerator. */
+       private final SourceReaderContext context;
+
+       /** The availability future. This reader is available as soon as a 
split is assigned. */
+       private final CompletableFuture<Void> availability;
+
+       /** The iterator producing data. Non-null after a split has been 
assigned.

Review comment:
       nit: broken formatting




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to