[
https://issues.apache.org/jira/browse/BEAM-6179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ankur Goenka resolved BEAM-6179.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.10.0
> Batch size estimation failing
> -----------------------------
>
> Key: BEAM-6179
> URL: https://issues.apache.org/jira/browse/BEAM-6179
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, sdk-py-harness
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Major
> Fix For: 2.10.0
>
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Batch size estimation is failing on flink when running 13MB input pipeline
> with error
> ValueError: On entry to DLASCL parameter number 4 had an illegal value
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
> received from SDK harness for instruction 48: Traceback (most recent call
> last):
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 135, in _execute
> response = task()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 170, in <lambda>
> self._execute(lambda: worker.do_instruction(work), work)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 221, in do_instruction
> request.instruction_id)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 237, in process_bundle
> bundle_processor.process_bundle(instruction_id)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 480, in process_bundle
> ].process_encoded(data.data)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 125, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 182, in
> apache_beam.runners.worker.operations.Operation.output
> def output(self, windowed_value, output_index=0):
> File "apache_beam/runners/worker/operations.py", line 183, in
> apache_beam.runners.worker.operations.Operation.output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 89, in
> apache_beam.runners.worker.operations.ConsumerSet.receive
> cython.cast(Operation, consumer).process(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 497, in
> apache_beam.runners.worker.operations.DoOperation.process
> with self.scoped_process_state:
> File "apache_beam/runners/worker/operations.py", line 498, in
> apache_beam.runners.worker.operations.DoOperation.process
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 680, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 686, in
> apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 709, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise
> File "apache_beam/runners/common.py", line 684, in
> apache_beam.runners.common.DoFnRunner.process
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 420, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> output_processor.process_outputs(
> File "apache_beam/runners/common.py", line 794, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> self.main_receivers.receive(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 89, in
> apache_beam.runners.worker.operations.ConsumerSet.receive
> cython.cast(Operation, consumer).process(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 497, in
> apache_beam.runners.worker.operations.DoOperation.process
> with self.scoped_process_state:
> File "apache_beam/runners/worker/operations.py", line 498, in
> apache_beam.runners.worker.operations.DoOperation.process
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 680, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 686, in
> apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 709, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise
> File "apache_beam/runners/common.py", line 684, in
> apache_beam.runners.common.DoFnRunner.process
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 420, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> output_processor.process_outputs(
> File "apache_beam/runners/common.py", line 794, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> self.main_receivers.receive(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 89, in
> apache_beam.runners.worker.operations.ConsumerSet.receive
> cython.cast(Operation, consumer).process(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 497, in
> apache_beam.runners.worker.operations.DoOperation.process
> with self.scoped_process_state:
> File "apache_beam/runners/worker/operations.py", line 498, in
> apache_beam.runners.worker.operations.DoOperation.process
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 680, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 686, in
> apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 709, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise
> File "apache_beam/runners/common.py", line 684, in
> apache_beam.runners.common.DoFnRunner.process
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 420, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> output_processor.process_outputs(
> File "apache_beam/runners/common.py", line 794, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> self.main_receivers.receive(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 89, in
> apache_beam.runners.worker.operations.ConsumerSet.receive
> cython.cast(Operation, consumer).process(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 497, in
> apache_beam.runners.worker.operations.DoOperation.process
> with self.scoped_process_state:
> File "apache_beam/runners/worker/operations.py", line 498, in
> apache_beam.runners.worker.operations.DoOperation.process
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 680, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 686, in
> apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 709, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise
> File "apache_beam/runners/common.py", line 684, in
> apache_beam.runners.common.DoFnRunner.process
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 420, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> output_processor.process_outputs(
> File "apache_beam/runners/common.py", line 794, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> self.main_receivers.receive(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 89, in
> apache_beam.runners.worker.operations.ConsumerSet.receive
> cython.cast(Operation, consumer).process(windowed_value)
> File "apache_beam/runners/worker/operations.py", line 497, in
> apache_beam.runners.worker.operations.DoOperation.process
> with self.scoped_process_state:
> File "apache_beam/runners/worker/operations.py", line 498, in
> apache_beam.runners.worker.operations.DoOperation.process
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 680, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 686, in
> apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 724, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise_with_traceback(new_exn)
> File "apache_beam/runners/common.py", line 684, in
> apache_beam.runners.common.DoFnRunner.process
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 420, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> output_processor.process_outputs(
> File "apache_beam/runners/common.py", line 770, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> for result in results:
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line
> 398, in process
> self._batch_size = self._batch_size_estimator.next_batch_size()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line
> 351, in next_batch_size
> a, b = self.linear_regression(xs, ys)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line
> 321, in linear_regression_numpy
> b, a = np.polyfit(xs, ys, 1, w=weight)
> File "/usr/local/lib/python2.7/site-packages/numpy/lib/polynomial.py", line
> 585, in polyfit
> c, resids, rank, s = lstsq(lhs, rhs, rcond)
> File "/usr/local/lib/python2.7/site-packages/numpy/linalg/linalg.py", line
> 1957, in lstsq
> 0, work, lwork, iwork, 0)
> ValueError: On entry to DLASCL parameter number 4 had an illegal value [while
> running
> 'Analyze/RunPhase[0]/BatchAnalyzerInputs/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)']
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748)
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)