Adding Silviu, who can comment more. On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <[email protected]> wrote:
> I would like to use Apache Beam as my dataflow engine. Unfortunately for > me, the input to the dataflow isn't a text file and the result of the > pipeline is also not a text file. To see how difficult it would be to > create the desired classes, I've subclassed ptransform.PTransform as well > as iobase.Source and started on the read side of the problem. I've cloned > and installed https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git > on my VM and I am working with the most recent commit 3a56ce7. > > Next, I wrote the following code, which looks very close to the class Read > in google/cloud/dataflow/io/iobase.py and TextFileSource in > google/cloud/dataflow/io/iobase.py > > import argparse > import logging > import sys > > import google.cloud.dataflow as df > > from google.cloud.dataflow.io import iobase > > class DummyFileSource(iobase.Source): > """A source for a GCS or local dummy file. > """ > > def __init__(self, params): > self._params = params > return > > @property > def format(self): > """Source format name required for remote execution.""" > return 'binary' > > from google.cloud.dataflow import pvalue > from google.cloud.dataflow.transforms import core > from google.cloud.dataflow.transforms import ptransform > > class DummyRead(ptransform.PTransform): > """A transform that reads a PCollection.""" > > def __init__(self, *args, **kwargs): > """Initializes a DummyRead transform. > > Args: > *args: A tuple of position arguments. > **kwargs: A dictionary of keyword arguments. > > The *args, **kwargs are expected to be (label, source) or (source). > """ > > label, source = self.parse_label_and_arg(args, kwargs, 'source') > super(DummyRead, self).__init__(label) > self.source = source > return > > def apply(self, pbegin): > assert isinstance(pbegin, pvalue.PBegin) > self.pipeline = pbegin.pipeline > return pvalue.PCollection(pipeline=self.pipeline, transform=self) > > def get_windowing(self, unused_inputs): > return core.Windowing(window.GlobalWindows()) > > def main(argv = None): > if argv is None: > argv = sys.argv > > DummyFileSource('vat') > parser = argparse.ArgumentParser() > parser.add_argument('--baseURI', > dest='baseURI', > default='http://localhost:3000', > help='Base URI.') > > parser.add_argument('--fakeData', > dest='fakeData', > default='fakeData', > help='Fake data') > known_args, pipeline_args = parser.parse_known_args(argv) > > p = df.Pipeline(argv=pipeline_args) > > params = {} > postStackDummy = p | DummyRead('read', > DummyFileSource(params)) > > # > # Actually run the pipeline (all operations above are deferred). > # > > p.run() > > return > > if __name__ == '__main__': > logging.getLogger().setLevel(logging.INFO) > sys.exit(main(sys.argv) or 0) > > When I run this program, the following traceback is produced: > > Traceback (most recent call last): > File "sample.py", line 85, in <module> > sys.exit(main(sys.argv) or 0) > File "sample.py", line 79, in main > p.run() > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > line 135, in run > return self.runner.run(self) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > line 81, in run > pipeline.visit(RunVisitor(self), node=node) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > line 168, in visit > start_transform.visit(visitor, self, visited) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > line 376, in visit > part.visit(visitor, pipeline, visited) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > line 379, in visit > visitor.visit_transform(self) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > line 79, in visit_transform > self.runner.run_transform(transform_node) > File > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > line 155, in run_transform > transform_node.transform, self)) > NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>] > not implemented in runner > <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object at > 0x7f42af19e750>. > > What I am having a hard time seeing is the association of the label 'read' > in the constructor of the class DummyRead and where the method needs to be > implemented. What am I missing? >
