Adding Silviu, who can comment more.

On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <[email protected]>
wrote:

> I would like to use Apache Beam as my dataflow engine. Unfortunately for
> me, the input to the dataflow isn't a text file and the result of the
> pipeline is also not a text file. To see how difficult it would be to
> create the desired classes, I've subclassed ptransform.PTransform as well
> as iobase.Source and started on the read side of the problem.  I've cloned
> and installed https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
> on my VM and I am working with the most recent commit 3a56ce7.
>
> Next, I wrote the following code, which looks very close to the class Read
> in google/cloud/dataflow/io/iobase.py and TextFileSource in
> google/cloud/dataflow/io/iobase.py
>
> import argparse
> import logging
> import sys
>
> import google.cloud.dataflow as df
>
> from google.cloud.dataflow.io import iobase
>
> class DummyFileSource(iobase.Source):
>     """A source for a GCS or local dummy file.
>     """
>
>     def __init__(self, params):
>         self._params = params
>         return
>
>     @property
>     def format(self):
>         """Source format name required for remote execution."""
>         return 'binary'
>
> from google.cloud.dataflow import pvalue
> from google.cloud.dataflow.transforms import core
> from google.cloud.dataflow.transforms import ptransform
>
> class DummyRead(ptransform.PTransform):
>     """A transform that reads a PCollection."""
>
>     def __init__(self, *args, **kwargs):
>         """Initializes a DummyRead transform.
>
>         Args:
>         *args: A tuple of position arguments.
>         **kwargs: A dictionary of keyword arguments.
>
>         The *args, **kwargs are expected to be (label, source) or (source).
>         """
>
>         label, source = self.parse_label_and_arg(args, kwargs, 'source')
>         super(DummyRead, self).__init__(label)
>         self.source = source
>         return
>
>     def apply(self, pbegin):
>         assert isinstance(pbegin, pvalue.PBegin)
>         self.pipeline = pbegin.pipeline
>         return pvalue.PCollection(pipeline=self.pipeline, transform=self)
>
>     def get_windowing(self, unused_inputs):
>         return core.Windowing(window.GlobalWindows())
>
> def main(argv = None):
>     if argv is None:
>         argv = sys.argv
>
>     DummyFileSource('vat')
>     parser = argparse.ArgumentParser()
>     parser.add_argument('--baseURI',
>                         dest='baseURI',
>                         default='http://localhost:3000',
>                         help='Base URI.')
>
>     parser.add_argument('--fakeData',
>                         dest='fakeData',
>                         default='fakeData',
>                         help='Fake data')
>     known_args, pipeline_args = parser.parse_known_args(argv)
>
>     p = df.Pipeline(argv=pipeline_args)
>
>     params = {}
>     postStackDummy = p | DummyRead('read',
>                                    DummyFileSource(params))
>
>     #
>     # Actually run the pipeline (all operations above are deferred).
>     #
>
>     p.run()
>
>     return
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     sys.exit(main(sys.argv) or 0)
>
> When I run this program, the following traceback is produced:
>
> Traceback (most recent call last):
>   File "sample.py", line 85, in <module>
>     sys.exit(main(sys.argv) or 0)
>   File "sample.py", line 79, in main
>     p.run()
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 135, in run
>     return self.runner.run(self)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 81, in run
>     pipeline.visit(RunVisitor(self), node=node)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 168, in visit
>     start_transform.visit(visitor, self, visited)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 376, in visit
>     part.visit(visitor, pipeline, visited)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 379, in visit
>     visitor.visit_transform(self)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 79, in visit_transform
>     self.runner.run_transform(transform_node)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 155, in run_transform
>     transform_node.transform, self))
> NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>]
> not implemented in runner
> <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object at
> 0x7f42af19e750>.
>
> What I am having a hard time seeing is the association of the label 'read'
> in the constructor of the class DummyRead and where the method needs to be
> implemented.  What am I missing?
>

Reply via email to