Can I write a new PTransform in Java then?

> On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected]>
wrote:
>
> Hi Joseph,
>
> Python SDK currently does not support creating new sources. Sources that
> are currently available are backed by Google Dataflow service.
> Theoretically it should be possible to get new sources working just for
> DirectPipelineRunner by hacking the SDK but this has not been tested
> properly.
>
> Please let us know if you run into any issues when using existing sources.
> We hope to add a framework for creating new sources soon.
>
> Thanks,
> Cham
>
>
> On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]>
> wrote:
>
> > Adding Silviu, who can comment more.
> >
> > On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <
> > [email protected]>
> > wrote:
> >
> > > I would like to use Apache Beam as my dataflow engine. Unfortunately
for
> > > me, the input to the dataflow isn't a text file and the result of the
> > > pipeline is also not a text file. To see how difficult it would be to
> > > create the desired classes, I've subclassed ptransform.PTransform as
well
> > > as iobase.Source and started on the read side of the problem.  I've
> > cloned
> > > and installed
> > https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
> > > on my VM and I am working with the most recent commit 3a56ce7.
> > >
> > > Next, I wrote the following code, which looks very close to the class
> > Read
> > > in google/cloud/dataflow/io/iobase.py and TextFileSource in
> > > google/cloud/dataflow/io/iobase.py
> > >
> > > import argparse
> > > import logging
> > > import sys
> > >
> > > import google.cloud.dataflow as df
> > >
> > > from google.cloud.dataflow.io import iobase
> > >
> > > class DummyFileSource(iobase.Source):
> > >     """A source for a GCS or local dummy file.
> > >     """
> > >
> > >     def __init__(self, params):
> > >         self._params = params
> > >         return
> > >
> > >     @property
> > >     def format(self):
> > >         """Source format name required for remote execution."""
> > >         return 'binary'
> > >
> > > from google.cloud.dataflow import pvalue
> > > from google.cloud.dataflow.transforms import core
> > > from google.cloud.dataflow.transforms import ptransform
> > >
> > > class DummyRead(ptransform.PTransform):
> > >     """A transform that reads a PCollection."""
> > >
> > >     def __init__(self, *args, **kwargs):
> > >         """Initializes a DummyRead transform.
> > >
> > >         Args:
> > >         *args: A tuple of position arguments.
> > >         **kwargs: A dictionary of keyword arguments.
> > >
> > >         The *args, **kwargs are expected to be (label, source) or
> > (source).
> > >         """
> > >
> > >         label, source = self.parse_label_and_arg(args, kwargs,
'source')
> > >         super(DummyRead, self).__init__(label)
> > >         self.source = source
> > >         return
> > >
> > >     def apply(self, pbegin):
> > >         assert isinstance(pbegin, pvalue.PBegin)
> > >         self.pipeline = pbegin.pipeline
> > >         return pvalue.PCollection(pipeline=self.pipeline,
transform=self)
> > >
> > >     def get_windowing(self, unused_inputs):
> > >         return core.Windowing(window.GlobalWindows())
> > >
> > > def main(argv = None):
> > >     if argv is None:
> > >         argv = sys.argv
> > >
> > >     DummyFileSource('vat')
> > >     parser = argparse.ArgumentParser()
> > >     parser.add_argument('--baseURI',
> > >                         dest='baseURI',
> > >                         default='http://localhost:3000',
> > >                         help='Base URI.')
> > >
> > >     parser.add_argument('--fakeData',
> > >                         dest='fakeData',
> > >                         default='fakeData',
> > >                         help='Fake data')
> > >     known_args, pipeline_args = parser.parse_known_args(argv)
> > >
> > >     p = df.Pipeline(argv=pipeline_args)
> > >
> > >     params = {}
> > >     postStackDummy = p | DummyRead('read',
> > >                                    DummyFileSource(params))
> > >
> > >     #
> > >     # Actually run the pipeline (all operations above are deferred).
> > >     #
> > >
> > >     p.run()
> > >
> > >     return
> > >
> > > if __name__ == '__main__':
> > >     logging.getLogger().setLevel(logging.INFO)
> > >     sys.exit(main(sys.argv) or 0)
> > >
> > > When I run this program, the following traceback is produced:
> > >
> > > Traceback (most recent call last):
> > >   File "sample.py", line 85, in <module>
> > >     sys.exit(main(sys.argv) or 0)
> > >   File "sample.py", line 79, in main
> > >     p.run()
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > > line 135, in run
> > >     return self.runner.run(self)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > > line 81, in run
> > >     pipeline.visit(RunVisitor(self), node=node)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > > line 168, in visit
> > >     start_transform.visit(visitor, self, visited)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > > line 376, in visit
> > >     part.visit(visitor, pipeline, visited)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > > line 379, in visit
> > >     visitor.visit_transform(self)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > > line 79, in visit_transform
> > >     self.runner.run_transform(transform_node)
> > >   File
> > >
> > >
> > >
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > > line 155, in run_transform
> > >     transform_node.transform, self))
> > > NotImplementedError: Execution of [<DummyRead(PTransform)
label=[read]>]
> > > not implemented in runner
> > > <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner
object
> > at
> > > 0x7f42af19e750>.
> > >
> > > What I am having a hard time seeing is the association of the label
> > > 'read'
> > > in the constructor of the class DummyRead and where the method needs
to
> > > be
> > > implemented.  What am I missing?

Reply via email to