Good point Davor, thanks !

My bad, I will update the examples ;)

Regards
JB

On 03/09/2016 05:21 PM, Davor Bonaci wrote:
In Java, please follow [1] to build new custom sources and sinks.

[1] https://cloud.google.com/dataflow/model/custom-io

On Wed, Mar 9, 2016 at 7:57 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

Hi Joseph,

yes, you can create your own PTransform in Java.

For instance, something like:

   static class MyPTransform extends
       PTransform<PCollection<KV<String, String>>, PCollection<String>> {
     @Override
     public PCollection<String> apply(PCollection<KV<String, String>> data)
{

         PCollection... foo = data.apply(GroupByKey.create())....
         PCollection... bar = foo.apply(ParDo.of(...));

         return bar;
    }}

Regards
JB


On 03/09/2016 02:44 PM, Joseph Winston wrote:

Can I write a new PTransform in Java then?

On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected]


wrote:


Hi Joseph,

Python SDK currently does not support creating new sources. Sources that
are currently available are backed by Google Dataflow service.
Theoretically it should be possible to get new sources working just for
DirectPipelineRunner by hacking the SDK but this has not been tested
properly.

Please let us know if you run into any issues when using existing
sources.
We hope to add a framework for creating new sources soon.

Thanks,
Cham


On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]>
wrote:

Adding Silviu, who can comment more.

On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <
[email protected]>
wrote:

I would like to use Apache Beam as my dataflow engine. Unfortunately

for

me, the input to the dataflow isn't a text file and the result of the
pipeline is also not a text file. To see how difficult it would be to
create the desired classes, I've subclassed ptransform.PTransform as

well

as iobase.Source and started on the read side of the problem.  I've

cloned

and installed

https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git

on my VM and I am working with the most recent commit 3a56ce7.

Next, I wrote the following code, which looks very close to the class

Read

in google/cloud/dataflow/io/iobase.py and TextFileSource in
google/cloud/dataflow/io/iobase.py

import argparse
import logging
import sys

import google.cloud.dataflow as df

from google.cloud.dataflow.io import iobase

class DummyFileSource(iobase.Source):
      """A source for a GCS or local dummy file.
      """

      def __init__(self, params):
          self._params = params
          return

      @property
      def format(self):
          """Source format name required for remote execution."""
          return 'binary'

from google.cloud.dataflow import pvalue
from google.cloud.dataflow.transforms import core
from google.cloud.dataflow.transforms import ptransform

class DummyRead(ptransform.PTransform):
      """A transform that reads a PCollection."""

      def __init__(self, *args, **kwargs):
          """Initializes a DummyRead transform.

          Args:
          *args: A tuple of position arguments.
          **kwargs: A dictionary of keyword arguments.

          The *args, **kwargs are expected to be (label, source) or

(source).

          """

          label, source = self.parse_label_and_arg(args, kwargs,

'source')

          super(DummyRead, self).__init__(label)
          self.source = source
          return

      def apply(self, pbegin):
          assert isinstance(pbegin, pvalue.PBegin)
          self.pipeline = pbegin.pipeline
          return pvalue.PCollection(pipeline=self.pipeline,

transform=self)


      def get_windowing(self, unused_inputs):
          return core.Windowing(window.GlobalWindows())

def main(argv = None):
      if argv is None:
          argv = sys.argv

      DummyFileSource('vat')
      parser = argparse.ArgumentParser()
      parser.add_argument('--baseURI',
                          dest='baseURI',
                          default='http://localhost:3000',
                          help='Base URI.')

      parser.add_argument('--fakeData',
                          dest='fakeData',
                          default='fakeData',
                          help='Fake data')
      known_args, pipeline_args = parser.parse_known_args(argv)

      p = df.Pipeline(argv=pipeline_args)

      params = {}
      postStackDummy = p | DummyRead('read',
                                     DummyFileSource(params))

      #
      # Actually run the pipeline (all operations above are deferred).
      #

      p.run()

      return

if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      sys.exit(main(sys.argv) or 0)

When I run this program, the following traceback is produced:

Traceback (most recent call last):
    File "sample.py", line 85, in <module>
      sys.exit(main(sys.argv) or 0)
    File "sample.py", line 79, in main
      p.run()
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",

line 135, in run
      return self.runner.run(self)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",

line 81, in run
      pipeline.visit(RunVisitor(self), node=node)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",

line 168, in visit
      start_transform.visit(visitor, self, visited)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",

line 376, in visit
      part.visit(visitor, pipeline, visited)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",

line 379, in visit
      visitor.visit_transform(self)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",

line 79, in visit_transform
      self.runner.run_transform(transform_node)
    File




"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",

line 155, in run_transform
      transform_node.transform, self))
NotImplementedError: Execution of [<DummyRead(PTransform)

label=[read]>]

not implemented in runner
<google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner

object

at

0x7f42af19e750>.

What I am having a hard time seeing is the association of the label
'read'
in the constructor of the class DummyRead and where the method needs

to

be
implemented.  What am I missing?



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to