Hi Joseph,

yes, you can create your own PTransform in Java.

For instance, something like:

  static class MyPTransform extends
      PTransform<PCollection<KV<String, String>>, PCollection<String>> {
    @Override
public PCollection<String> apply(PCollection<KV<String, String>> data) {

        PCollection... foo = data.apply(GroupByKey.create())....
        PCollection... bar = foo.apply(ParDo.of(...));

        return bar;
   }}

Regards
JB

On 03/09/2016 02:44 PM, Joseph Winston wrote:
Can I write a new PTransform in Java then?

On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected]>
wrote:

Hi Joseph,

Python SDK currently does not support creating new sources. Sources that
are currently available are backed by Google Dataflow service.
Theoretically it should be possible to get new sources working just for
DirectPipelineRunner by hacking the SDK but this has not been tested
properly.

Please let us know if you run into any issues when using existing sources.
We hope to add a framework for creating new sources soon.

Thanks,
Cham


On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]>
wrote:

Adding Silviu, who can comment more.

On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <
[email protected]>
wrote:

I would like to use Apache Beam as my dataflow engine. Unfortunately
for
me, the input to the dataflow isn't a text file and the result of the
pipeline is also not a text file. To see how difficult it would be to
create the desired classes, I've subclassed ptransform.PTransform as
well
as iobase.Source and started on the read side of the problem.  I've
cloned
and installed
https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
on my VM and I am working with the most recent commit 3a56ce7.

Next, I wrote the following code, which looks very close to the class
Read
in google/cloud/dataflow/io/iobase.py and TextFileSource in
google/cloud/dataflow/io/iobase.py

import argparse
import logging
import sys

import google.cloud.dataflow as df

from google.cloud.dataflow.io import iobase

class DummyFileSource(iobase.Source):
     """A source for a GCS or local dummy file.
     """

     def __init__(self, params):
         self._params = params
         return

     @property
     def format(self):
         """Source format name required for remote execution."""
         return 'binary'

from google.cloud.dataflow import pvalue
from google.cloud.dataflow.transforms import core
from google.cloud.dataflow.transforms import ptransform

class DummyRead(ptransform.PTransform):
     """A transform that reads a PCollection."""

     def __init__(self, *args, **kwargs):
         """Initializes a DummyRead transform.

         Args:
         *args: A tuple of position arguments.
         **kwargs: A dictionary of keyword arguments.

         The *args, **kwargs are expected to be (label, source) or
(source).
         """

         label, source = self.parse_label_and_arg(args, kwargs,
'source')
         super(DummyRead, self).__init__(label)
         self.source = source
         return

     def apply(self, pbegin):
         assert isinstance(pbegin, pvalue.PBegin)
         self.pipeline = pbegin.pipeline
         return pvalue.PCollection(pipeline=self.pipeline,
transform=self)

     def get_windowing(self, unused_inputs):
         return core.Windowing(window.GlobalWindows())

def main(argv = None):
     if argv is None:
         argv = sys.argv

     DummyFileSource('vat')
     parser = argparse.ArgumentParser()
     parser.add_argument('--baseURI',
                         dest='baseURI',
                         default='http://localhost:3000',
                         help='Base URI.')

     parser.add_argument('--fakeData',
                         dest='fakeData',
                         default='fakeData',
                         help='Fake data')
     known_args, pipeline_args = parser.parse_known_args(argv)

     p = df.Pipeline(argv=pipeline_args)

     params = {}
     postStackDummy = p | DummyRead('read',
                                    DummyFileSource(params))

     #
     # Actually run the pipeline (all operations above are deferred).
     #

     p.run()

     return

if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)
     sys.exit(main(sys.argv) or 0)

When I run this program, the following traceback is produced:

Traceback (most recent call last):
   File "sample.py", line 85, in <module>
     sys.exit(main(sys.argv) or 0)
   File "sample.py", line 79, in main
     p.run()
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 135, in run
     return self.runner.run(self)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 81, in run
     pipeline.visit(RunVisitor(self), node=node)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 168, in visit
     start_transform.visit(visitor, self, visited)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 376, in visit
     part.visit(visitor, pipeline, visited)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 379, in visit
     visitor.visit_transform(self)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 79, in visit_transform
     self.runner.run_transform(transform_node)
   File



"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 155, in run_transform
     transform_node.transform, self))
NotImplementedError: Execution of [<DummyRead(PTransform)
label=[read]>]
not implemented in runner
<google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner
object
at
0x7f42af19e750>.

What I am having a hard time seeing is the association of the label
'read'
in the constructor of the class DummyRead and where the method needs
to
be
implemented.  What am I missing?


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to