Can I write a new PTransform in Java then?
> On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected]> wrote: > > Hi Joseph, > > Python SDK currently does not support creating new sources. Sources that > are currently available are backed by Google Dataflow service. > Theoretically it should be possible to get new sources working just for > DirectPipelineRunner by hacking the SDK but this has not been tested > properly. > > Please let us know if you run into any issues when using existing sources. > We hope to add a framework for creating new sources soon. > > Thanks, > Cham > > > On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]> > wrote: > > > Adding Silviu, who can comment more. > > > > On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston < > > [email protected]> > > wrote: > > > > > I would like to use Apache Beam as my dataflow engine. Unfortunately for > > > me, the input to the dataflow isn't a text file and the result of the > > > pipeline is also not a text file. To see how difficult it would be to > > > create the desired classes, I've subclassed ptransform.PTransform as well > > > as iobase.Source and started on the read side of the problem. I've > > cloned > > > and installed > > https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git > > > on my VM and I am working with the most recent commit 3a56ce7. > > > > > > Next, I wrote the following code, which looks very close to the class > > Read > > > in google/cloud/dataflow/io/iobase.py and TextFileSource in > > > google/cloud/dataflow/io/iobase.py > > > > > > import argparse > > > import logging > > > import sys > > > > > > import google.cloud.dataflow as df > > > > > > from google.cloud.dataflow.io import iobase > > > > > > class DummyFileSource(iobase.Source): > > > """A source for a GCS or local dummy file. > > > """ > > > > > > def __init__(self, params): > > > self._params = params > > > return > > > > > > @property > > > def format(self): > > > """Source format name required for remote execution.""" > > > return 'binary' > > > > > > from google.cloud.dataflow import pvalue > > > from google.cloud.dataflow.transforms import core > > > from google.cloud.dataflow.transforms import ptransform > > > > > > class DummyRead(ptransform.PTransform): > > > """A transform that reads a PCollection.""" > > > > > > def __init__(self, *args, **kwargs): > > > """Initializes a DummyRead transform. > > > > > > Args: > > > *args: A tuple of position arguments. > > > **kwargs: A dictionary of keyword arguments. > > > > > > The *args, **kwargs are expected to be (label, source) or > > (source). > > > """ > > > > > > label, source = self.parse_label_and_arg(args, kwargs, 'source') > > > super(DummyRead, self).__init__(label) > > > self.source = source > > > return > > > > > > def apply(self, pbegin): > > > assert isinstance(pbegin, pvalue.PBegin) > > > self.pipeline = pbegin.pipeline > > > return pvalue.PCollection(pipeline=self.pipeline, transform=self) > > > > > > def get_windowing(self, unused_inputs): > > > return core.Windowing(window.GlobalWindows()) > > > > > > def main(argv = None): > > > if argv is None: > > > argv = sys.argv > > > > > > DummyFileSource('vat') > > > parser = argparse.ArgumentParser() > > > parser.add_argument('--baseURI', > > > dest='baseURI', > > > default='http://localhost:3000', > > > help='Base URI.') > > > > > > parser.add_argument('--fakeData', > > > dest='fakeData', > > > default='fakeData', > > > help='Fake data') > > > known_args, pipeline_args = parser.parse_known_args(argv) > > > > > > p = df.Pipeline(argv=pipeline_args) > > > > > > params = {} > > > postStackDummy = p | DummyRead('read', > > > DummyFileSource(params)) > > > > > > # > > > # Actually run the pipeline (all operations above are deferred). > > > # > > > > > > p.run() > > > > > > return > > > > > > if __name__ == '__main__': > > > logging.getLogger().setLevel(logging.INFO) > > > sys.exit(main(sys.argv) or 0) > > > > > > When I run this program, the following traceback is produced: > > > > > > Traceback (most recent call last): > > > File "sample.py", line 85, in <module> > > > sys.exit(main(sys.argv) or 0) > > > File "sample.py", line 79, in main > > > p.run() > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > > > line 135, in run > > > return self.runner.run(self) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > > > line 81, in run > > > pipeline.visit(RunVisitor(self), node=node) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > > > line 168, in visit > > > start_transform.visit(visitor, self, visited) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > > > line 376, in visit > > > part.visit(visitor, pipeline, visited) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", > > > line 379, in visit > > > visitor.visit_transform(self) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > > > line 79, in visit_transform > > > self.runner.run_transform(transform_node) > > > File > > > > > > > > > "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", > > > line 155, in run_transform > > > transform_node.transform, self)) > > > NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>] > > > not implemented in runner > > > <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object > > at > > > 0x7f42af19e750>. > > > > > > What I am having a hard time seeing is the association of the label > > > 'read' > > > in the constructor of the class DummyRead and where the method needs to > > > be > > > implemented. What am I missing?
