[ 
https://issues.apache.org/jira/browse/BEAM-6179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka resolved BEAM-6179.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 2.10.0

> Batch size estimation failing
> -----------------------------
>
>                 Key: BEAM-6179
>                 URL: https://issues.apache.org/jira/browse/BEAM-6179
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>             Fix For: 2.10.0
>
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Batch size estimation is failing on flink when running 13MB input pipeline 
> with error
> ValueError: On entry to DLASCL parameter number 4 had an illegal value 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> received from SDK harness for instruction 48: Traceback (most recent call 
> last):
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 135, in _execute
>     response = task()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 170, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 221, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 237, in process_bundle
>     bundle_processor.process_bundle(instruction_id)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 480, in process_bundle
>     ].process_encoded(data.data)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 125, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 182, in 
> apache_beam.runners.worker.operations.Operation.output
>     def output(self, windowed_value, output_index=0):
>   File "apache_beam/runners/worker/operations.py", line 183, in 
> apache_beam.runners.worker.operations.Operation.output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 89, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 497, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "apache_beam/runners/worker/operations.py", line 498, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 680, in 
> apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 686, in 
> apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 709, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise
>   File "apache_beam/runners/common.py", line 684, in 
> apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 420, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>     output_processor.process_outputs(
>   File "apache_beam/runners/common.py", line 794, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 89, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 497, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "apache_beam/runners/worker/operations.py", line 498, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 680, in 
> apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 686, in 
> apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 709, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise
>   File "apache_beam/runners/common.py", line 684, in 
> apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 420, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>     output_processor.process_outputs(
>   File "apache_beam/runners/common.py", line 794, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 89, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 497, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "apache_beam/runners/worker/operations.py", line 498, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 680, in 
> apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 686, in 
> apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 709, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise
>   File "apache_beam/runners/common.py", line 684, in 
> apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 420, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>     output_processor.process_outputs(
>   File "apache_beam/runners/common.py", line 794, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 89, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 497, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "apache_beam/runners/worker/operations.py", line 498, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 680, in 
> apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 686, in 
> apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 709, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise
>   File "apache_beam/runners/common.py", line 684, in 
> apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 420, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>     output_processor.process_outputs(
>   File "apache_beam/runners/common.py", line 794, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 89, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "apache_beam/runners/worker/operations.py", line 497, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "apache_beam/runners/worker/operations.py", line 498, in 
> apache_beam.runners.worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 680, in 
> apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 686, in 
> apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 724, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise_with_traceback(new_exn)
>   File "apache_beam/runners/common.py", line 684, in 
> apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 420, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>     output_processor.process_outputs(
>   File "apache_beam/runners/common.py", line 770, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>     for result in results:
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 
> 398, in process
>     self._batch_size = self._batch_size_estimator.next_batch_size()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 
> 351, in next_batch_size
>     a, b = self.linear_regression(xs, ys)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 
> 321, in linear_regression_numpy
>     b, a = np.polyfit(xs, ys, 1, w=weight)
>   File "/usr/local/lib/python2.7/site-packages/numpy/lib/polynomial.py", line 
> 585, in polyfit
>     c, resids, rank, s = lstsq(lhs, rhs, rcond)
>   File "/usr/local/lib/python2.7/site-packages/numpy/linalg/linalg.py", line 
> 1957, in lstsq
>     0, work, lwork, iwork, 0)
> ValueError: On entry to DLASCL parameter number 4 had an illegal value [while 
> running 
> 'Analyze/RunPhase[0]/BatchAnalyzerInputs/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)']
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
>       at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
>       at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
>       at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>       at java.lang.Thread.run(Thread.java:748)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to