Ankur Goenka created BEAM-6179:
----------------------------------

             Summary: Batch size estimation failing
                 Key: BEAM-6179
                 URL: https://issues.apache.org/jira/browse/BEAM-6179
             Project: Beam
          Issue Type: Bug
          Components: runner-flink, sdk-py-harness
            Reporter: Ankur Goenka
            Assignee: Ankur Goenka


Batch size estimation is failing on flink when running 13MB input pipeline with 
error
ValueError: On entry to DLASCL parameter number 4 had an illegal value 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction 48: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 135, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 221, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 237, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 480, in process_bundle
    ].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 125, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 182, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 709, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 420, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 794, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 709, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 420, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 794, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 709, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 420, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 794, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 709, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 420, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 794, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 420, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
  File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", 
line 398, in process
    self._batch_size = self._batch_size_estimator.next_batch_size()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", 
line 351, in next_batch_size
    a, b = self.linear_regression(xs, ys)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", 
line 321, in linear_regression_numpy
    b, a = np.polyfit(xs, ys, 1, w=weight)
  File "/usr/local/lib/python2.7/site-packages/numpy/lib/polynomial.py", line 
585, in polyfit
    c, resids, rank, s = lstsq(lhs, rhs, rcond)
  File "/usr/local/lib/python2.7/site-packages/numpy/linalg/linalg.py", line 
1957, in lstsq
    0, work, lwork, iwork, 0)
ValueError: On entry to DLASCL parameter number 4 had an illegal value [while 
running 
'Analyze/RunPhase[0]/BatchAnalyzerInputs/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)']

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
        at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:748)
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to