Yunfeng Zhou created FLINK-26263: ------------------------------------ Summary: Check data size in LogisticRegression Key: FLINK-26263 URL: https://issues.apache.org/jira/browse/FLINK-26263 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Yunfeng Zhou
In Flink ML LogisticRegression, the algorithm would fail if the parallelism is larger than input data size. For example, in `LogisticRegressionTest.testFitAndPredict()` if we add the following code ```java env.setParallelism(12); ``` Then the test case would fail with the following exception ``` Caused by: java.lang.IllegalArgumentException: bound must be positive at java.base/java.util.Random.nextInt(Random.java:388) at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.getMiniBatchData(LogisticRegression.java:351) at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.onEpochWatermarkIncremented(LogisticRegression.java:381) at org.apache.flink.iteration.operator.AbstractWrapperOperator.notifyEpochWatermarkIncrement(AbstractWrapperOperator.java:129) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.lambda$1(AbstractAllRoundWrapperOperator.java:105) at org.apache.flink.iteration.operator.OperatorUtils.processOperatorOrUdfIfSatisfy(OperatorUtils.java:79) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.onEpochWatermarkIncrement(AbstractAllRoundWrapperOperator.java:102) at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.tryUpdateLowerBound(OperatorEpochWatermarkTracker.java:79) at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.onEpochWatermark(OperatorEpochWatermarkTracker.java:63) at org.apache.flink.iteration.operator.AbstractWrapperOperator.onEpochWatermarkEvent(AbstractWrapperOperator.java:121) at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement(TwoInputAllRoundWrapperOperator.java:77) at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement2(TwoInputAllRoundWrapperOperator.java:59) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:834) ``` The cause of this exception is that LogisticRegression has not considered the case when input data size is 0. This can be resolved by adding an additional check. -- This message was sent by Atlassian Jira (v8.20.1#820001)