I would like to use Apache Beam as my dataflow engine. Unfortunately for
me, the input to the dataflow isn't a text file and the result of the
pipeline is also not a text file. To see how difficult it would be to
create the desired classes, I've subclassed ptransform.PTransform as well
as iobase.Source and started on the read side of the problem.  I've cloned
and installed https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
on my VM and I am working with the most recent commit 3a56ce7.

Next, I wrote the following code, which looks very close to the class Read
in google/cloud/dataflow/io/iobase.py and TextFileSource in
google/cloud/dataflow/io/iobase.py

import argparse
import logging
import sys

import google.cloud.dataflow as df

from google.cloud.dataflow.io import iobase

class DummyFileSource(iobase.Source):
    """A source for a GCS or local dummy file.
    """

    def __init__(self, params):
        self._params = params
        return

    @property
    def format(self):
        """Source format name required for remote execution."""
        return 'binary'

from google.cloud.dataflow import pvalue
from google.cloud.dataflow.transforms import core
from google.cloud.dataflow.transforms import ptransform

class DummyRead(ptransform.PTransform):
    """A transform that reads a PCollection."""

    def __init__(self, *args, **kwargs):
        """Initializes a DummyRead transform.

        Args:
        *args: A tuple of position arguments.
        **kwargs: A dictionary of keyword arguments.

        The *args, **kwargs are expected to be (label, source) or (source).
        """

        label, source = self.parse_label_and_arg(args, kwargs, 'source')
        super(DummyRead, self).__init__(label)
        self.source = source
        return

    def apply(self, pbegin):
        assert isinstance(pbegin, pvalue.PBegin)
        self.pipeline = pbegin.pipeline
        return pvalue.PCollection(pipeline=self.pipeline, transform=self)

    def get_windowing(self, unused_inputs):
        return core.Windowing(window.GlobalWindows())

def main(argv = None):
    if argv is None:
        argv = sys.argv

    DummyFileSource('vat')
    parser = argparse.ArgumentParser()
    parser.add_argument('--baseURI',
                        dest='baseURI',
                        default='http://localhost:3000',
                        help='Base URI.')

    parser.add_argument('--fakeData',
                        dest='fakeData',
                        default='fakeData',
                        help='Fake data')
    known_args, pipeline_args = parser.parse_known_args(argv)

    p = df.Pipeline(argv=pipeline_args)

    params = {}
    postStackDummy = p | DummyRead('read',
                                   DummyFileSource(params))

    #
    # Actually run the pipeline (all operations above are deferred).
    #

    p.run()

    return

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    sys.exit(main(sys.argv) or 0)

When I run this program, the following traceback is produced:

Traceback (most recent call last):
  File "sample.py", line 85, in <module>
    sys.exit(main(sys.argv) or 0)
  File "sample.py", line 79, in main
    p.run()
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 135, in run
    return self.runner.run(self)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 81, in run
    pipeline.visit(RunVisitor(self), node=node)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 168, in visit
    start_transform.visit(visitor, self, visited)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 376, in visit
    part.visit(visitor, pipeline, visited)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 379, in visit
    visitor.visit_transform(self)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 79, in visit_transform
    self.runner.run_transform(transform_node)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 155, in run_transform
    transform_node.transform, self))
NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>]
not implemented in runner
<google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object at
0x7f42af19e750>.

What I am having a hard time seeing is the association of the label 'read'
in the constructor of the class DummyRead and where the method needs to be
implemented.  What am I missing?

Reply via email to