XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1190780451


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -49,10 +50,18 @@
     /**
      * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
      *
+     * <p>He requires that the end must be greater than the start and that the 
total number cannot
+     * be greater than max-1.
+     *

Review Comment:
   ```suggestion
        * <p>The {@code SequenceGenerator} requires that the {@code end} must 
be greater than the {@code start} and that the total number cannot be greater 
than {@code Long.MAX_VALUE - 1}.
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -111,11 +120,20 @@ public boolean hasNext() {
         return !this.valuesToEmit.isEmpty();
     }
 
-    private static int safeDivide(long left, long right) {
-        Preconditions.checkArgument(right > 0);
-        Preconditions.checkArgument(left >= 0);
-        Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
-        return (int) (left / right);
+    private static long safeDivide(long totalRows, long stepSize) {
+        Preconditions.checkArgument(stepSize > 0, "cannot be equal to 0");
+        Preconditions.checkArgument(totalRows >= 0, "Cannot be less than 0");
+        return totalRows / stepSize;
+    }

Review Comment:
   It feels like this method is not necessary anymore considering that we have 
the invariant of `MAX_VALUE-1` in this class. We could remove it and just use 
`totalNumberOfElements / stepSize` in the calling `open` method



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -49,10 +50,18 @@
     /**
      * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
      *
+     * <p>He requires that the end must be greater than the start and that the 
total number cannot
+     * be greater than max-1.
+     *
      * @param start Start of the range of numbers to emit.
      * @param end End of the range of numbers to emit.
      */
     public SequenceGenerator(long start, long end) {

Review Comment:
   ```suggestion
       protected SequenceGenerator(long start, long end) {
   ```



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGeneratorTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link SequenceGenerator}. */
+public class SequenceGeneratorTest {
+
+    @Test
+    public void testStartGreaterThanEnd() {
+        assertThatThrownBy(
+                        () -> {
+                            final long start = 30;
+                            final long end = 20;
+                            SequenceGenerator.longGenerator(start, end);
+                        })
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "The start value cannot be greater than the 
end value."));
+    }
+
+    @Test
+    public void testTotalQuantity() {

Review Comment:
   This test method can be split into three different test methods 
`testTooLargeRange`, `testMaxRange` and `testSequenceCreation`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -111,11 +120,20 @@ public boolean hasNext() {
         return !this.valuesToEmit.isEmpty();
     }
 
-    private static int safeDivide(long left, long right) {
-        Preconditions.checkArgument(right > 0);
-        Preconditions.checkArgument(left >= 0);
-        Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
-        return (int) (left / right);
+    private static long safeDivide(long totalRows, long stepSize) {
+        Preconditions.checkArgument(stepSize > 0, "cannot be equal to 0");
+        Preconditions.checkArgument(totalRows >= 0, "Cannot be less than 0");
+        return totalRows / stepSize;
+    }
+
+    @VisibleForTesting
+    public long getStart() {
+        return start;
+    }
+
+    @VisibleForTesting
+    public long getEnd() {
+        return end;

Review Comment:
   We could introduce a package-private constructor in `SequenceGenerator` that 
allows a third parameter `numberOfElementsLimit` and annotated it with 
`@VisibleForTesting`. The public constructor would call this constructor with 
the default value being set to `Long.MAX_VALUE - 1`.
   
   That way, we could test the actual sequence generation instead of just 
checking the parameters (even for the test case where we want to test the 
number of elements being exactly at the limit). WDYT?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -49,10 +50,18 @@
     /**
      * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
      *
+     * <p>He requires that the end must be greater than the start and that the 
total number cannot
+     * be greater than max-1.
+     *
      * @param start Start of the range of numbers to emit.
      * @param end End of the range of numbers to emit.
      */
     public SequenceGenerator(long start, long end) {

Review Comment:
   ```suggestion
       public SequenceGenerator(long inclStart, long inclEnd) {
   ```
   Can we make the variable/field names more descriptive? For ranges, it's 
always unclear whether the limits are included in the range or not



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGeneratorTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;

Review Comment:
   We want to use to JUnit5



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGeneratorTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link SequenceGenerator}. */
+public class SequenceGeneratorTest {
+
+    @Test
+    public void testStartGreaterThanEnd() {
+        assertThatThrownBy(
+                        () -> {
+                            final long start = 30;
+                            final long end = 20;
+                            SequenceGenerator.longGenerator(start, end);
+                        })
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "The start value cannot be greater than the 
end value."));

Review Comment:
   ```suggestion
   final int start = 30;
           final int end = 20;
           assertThatThrownBy(
                           () -> SequenceGenerator.longGenerator(start, 
end)).isInstanceOf(IllegalArgumentException.class).hasMessage("The start value 
(%s) cannot be greater than the end value (%s).", start, end);
   ```
   The `anyCauseMatches` is not necessary. We should be as explicit as possible 
in the test cases. The same applies to the other tests.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -49,10 +50,18 @@
     /**
      * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
      *
+     * <p>He requires that the end must be greater than the start and that the 
total number cannot
+     * be greater than max-1.
+     *
      * @param start Start of the range of numbers to emit.
      * @param end End of the range of numbers to emit.
      */
     public SequenceGenerator(long start, long end) {
+        Preconditions.checkArgument(
+                start < end, "The start value cannot be greater than the end 
value.");

Review Comment:
   ```suggestion
                   start < end, "The start value (%s) cannot be greater than 
the end value (%s).", start, end);
   ```
   Error messages should give the user as much insights as possible into what 
the cause of the error was.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -49,10 +50,18 @@
     /**
      * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
      *
+     * <p>He requires that the end must be greater than the start and that the 
total number cannot
+     * be greater than max-1.
+     *
      * @param start Start of the range of numbers to emit.
      * @param end End of the range of numbers to emit.
      */
     public SequenceGenerator(long start, long end) {
+        Preconditions.checkArgument(
+                start < end, "The start value cannot be greater than the end 
value.");
+        Preconditions.checkArgument(
+                end - start <= Long.MAX_VALUE - 1,
+                "The total quantity exceeds the maximum limit: Long.MAX_VALUE 
- 1.");

Review Comment:
   ```suggestion
                   "The total size of range (%s, %s) exceeds the maximum limit: 
Long.MAX_VALUE - 1.", start, end);
   ```



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java:
##########
@@ -93,4 +93,9 @@ public ChangelogMode getChangelogMode() {
     public void applyLimit(long limit) {
         this.numberOfRows = limit;
     }
+
+    @VisibleForTesting
+    public DataGenerator<?>[] getFieldGenerators() {

Review Comment:
   ```suggestion
       DataGenerator<?>[] getFieldGenerators() {
   ```
   The `DataGenTableSOurceFactoryTest` seems to be in the wrong package in the 
`test` sources. We could make this one `package-private` if it would match the 
package structure of the production code.



##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -264,57 +267,63 @@ void testSequenceCheckpointRestore() throws Exception {
     }
 
     @Test
-    void testLackStartForSequence() {
-        assertThatThrownBy(
-                        () -> {
-                            DescriptorProperties descriptor = new 
DescriptorProperties();
-                            descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
-                            descriptor.putString(
-                                    DataGenConnectorOptionsUtil.FIELDS
-                                            + ".f0."
-                                            + DataGenConnectorOptionsUtil.KIND,
-                                    DataGenConnectorOptionsUtil.SEQUENCE);
-                            descriptor.putLong(
-                                    DataGenConnectorOptionsUtil.FIELDS
-                                            + ".f0."
-                                            + DataGenConnectorOptionsUtil.END,
-                                    100);
+    void testDefaultValueForSequence() {

Review Comment:
   Shouldn't we rather create a `SequenceGeneratorVisitorTest`? That would make 
the unit test leaner.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGeneratorTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link SequenceGenerator}. */
+public class SequenceGeneratorTest {
+
+    @Test
+    public void testStartGreaterThanEnd() {
+        assertThatThrownBy(
+                        () -> {
+                            final long start = 30;
+                            final long end = 20;
+                            SequenceGenerator.longGenerator(start, end);
+                        })
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "The start value cannot be greater than the 
end value."));
+    }
+
+    @Test
+    public void testTotalQuantity() {
+        assertThatThrownBy(
+                        () -> {
+                            final long start = 0;
+                            final long end = Long.MAX_VALUE;
+                            SequenceGenerator.longGenerator(start, end);
+                        })
+                .satisfies(
+                        anyCauseMatches(

Review Comment:
   Same as in the comment above applies here as well. We want to have the test 
code being as specific as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to