In Java, please follow [1] to build new custom sources and sinks. [1] https://cloud.google.com/dataflow/model/custom-io
On Wed, Mar 9, 2016 at 7:57 AM, Jean-Baptiste Onofré <[email protected]> wrote: > Hi Joseph, > > yes, you can create your own PTransform in Java. > > For instance, something like: > > static class MyPTransform extends > PTransform<PCollection<KV<String, String>>, PCollection<String>> { > @Override > public PCollection<String> apply(PCollection<KV<String, String>> data) > { > > PCollection... foo = data.apply(GroupByKey.create()).... > PCollection... bar = foo.apply(ParDo.of(...)); > > return bar; > }} > > Regards > JB > > > On 03/09/2016 02:44 PM, Joseph Winston wrote: > >> Can I write a new PTransform in Java then? >> >> On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected] >>> > >>> >> wrote: >> >>> >>> Hi Joseph, >>> >>> Python SDK currently does not support creating new sources. Sources that >>> are currently available are backed by Google Dataflow service. >>> Theoretically it should be possible to get new sources working just for >>> DirectPipelineRunner by hacking the SDK but this has not been tested >>> properly. >>> >>> Please let us know if you run into any issues when using existing >>> sources. >>> We hope to add a framework for creating new sources soon. >>> >>> Thanks, >>> Cham >>> >>> >>> On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]> >>> wrote: >>> >>> Adding Silviu, who can comment more. >>>> >>>> On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston < >>>> [email protected]> >>>> wrote: >>>> >>>> I would like to use Apache Beam as my dataflow engine. Unfortunately >>>>> >>>> for >> >>> me, the input to the dataflow isn't a text file and the result of the >>>>> pipeline is also not a text file. To see how difficult it would be to >>>>> create the desired classes, I've subclassed ptransform.PTransform as >>>>> >>>> well >> >>> as iobase.Source and started on the read side of the problem. I've >>>>> >>>> cloned >>>> >>>>> and installed >>>>> >>>> https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git >>>> >>>>> on my VM and I am working with the most recent commit 3a56ce7. >>>>> >>>>> Next, I wrote the following code, which looks very close to the class >>>>> >>>> Read >>>> >>>>> in google/cloud/dataflow/io/iobase.py and TextFileSource in >>>>> google/cloud/dataflow/io/iobase.py >>>>> >>>>> import argparse >>>>> import logging >>>>> import sys >>>>> >>>>> import google.cloud.dataflow as df >>>>> >>>>> from google.cloud.dataflow.io import iobase >>>>> >>>>> class DummyFileSource(iobase.Source): >>>>> """A source for a GCS or local dummy file. >>>>> """ >>>>> >>>>> def __init__(self, params): >>>>> self._params = params >>>>> return >>>>> >>>>> @property >>>>> def format(self): >>>>> """Source format name required for remote execution.""" >>>>> return 'binary' >>>>> >>>>> from google.cloud.dataflow import pvalue >>>>> from google.cloud.dataflow.transforms import core >>>>> from google.cloud.dataflow.transforms import ptransform >>>>> >>>>> class DummyRead(ptransform.PTransform): >>>>> """A transform that reads a PCollection.""" >>>>> >>>>> def __init__(self, *args, **kwargs): >>>>> """Initializes a DummyRead transform. >>>>> >>>>> Args: >>>>> *args: A tuple of position arguments. >>>>> **kwargs: A dictionary of keyword arguments. >>>>> >>>>> The *args, **kwargs are expected to be (label, source) or >>>>> >>>> (source). >>>> >>>>> """ >>>>> >>>>> label, source = self.parse_label_and_arg(args, kwargs, >>>>> >>>> 'source') >> >>> super(DummyRead, self).__init__(label) >>>>> self.source = source >>>>> return >>>>> >>>>> def apply(self, pbegin): >>>>> assert isinstance(pbegin, pvalue.PBegin) >>>>> self.pipeline = pbegin.pipeline >>>>> return pvalue.PCollection(pipeline=self.pipeline, >>>>> >>>> transform=self) >> >>> >>>>> def get_windowing(self, unused_inputs): >>>>> return core.Windowing(window.GlobalWindows()) >>>>> >>>>> def main(argv = None): >>>>> if argv is None: >>>>> argv = sys.argv >>>>> >>>>> DummyFileSource('vat') >>>>> parser = argparse.ArgumentParser() >>>>> parser.add_argument('--baseURI', >>>>> dest='baseURI', >>>>> default='http://localhost:3000', >>>>> help='Base URI.') >>>>> >>>>> parser.add_argument('--fakeData', >>>>> dest='fakeData', >>>>> default='fakeData', >>>>> help='Fake data') >>>>> known_args, pipeline_args = parser.parse_known_args(argv) >>>>> >>>>> p = df.Pipeline(argv=pipeline_args) >>>>> >>>>> params = {} >>>>> postStackDummy = p | DummyRead('read', >>>>> DummyFileSource(params)) >>>>> >>>>> # >>>>> # Actually run the pipeline (all operations above are deferred). >>>>> # >>>>> >>>>> p.run() >>>>> >>>>> return >>>>> >>>>> if __name__ == '__main__': >>>>> logging.getLogger().setLevel(logging.INFO) >>>>> sys.exit(main(sys.argv) or 0) >>>>> >>>>> When I run this program, the following traceback is produced: >>>>> >>>>> Traceback (most recent call last): >>>>> File "sample.py", line 85, in <module> >>>>> sys.exit(main(sys.argv) or 0) >>>>> File "sample.py", line 79, in main >>>>> p.run() >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", >> >>> line 135, in run >>>>> return self.runner.run(self) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", >> >>> line 81, in run >>>>> pipeline.visit(RunVisitor(self), node=node) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", >> >>> line 168, in visit >>>>> start_transform.visit(visitor, self, visited) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", >> >>> line 376, in visit >>>>> part.visit(visitor, pipeline, visited) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py", >> >>> line 379, in visit >>>>> visitor.visit_transform(self) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", >> >>> line 79, in visit_transform >>>>> self.runner.run_transform(transform_node) >>>>> File >>>>> >>>>> >>>>> >>>>> >> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py", >> >>> line 155, in run_transform >>>>> transform_node.transform, self)) >>>>> NotImplementedError: Execution of [<DummyRead(PTransform) >>>>> >>>> label=[read]>] >> >>> not implemented in runner >>>>> <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner >>>>> >>>> object >> >>> at >>>> >>>>> 0x7f42af19e750>. >>>>> >>>>> What I am having a hard time seeing is the association of the label >>>>> 'read' >>>>> in the constructor of the class DummyRead and where the method needs >>>>> >>>> to >> >>> be >>>>> implemented. What am I missing? >>>>> >>>> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
