In Java, please follow [1] to build new custom sources and sinks.

[1] https://cloud.google.com/dataflow/model/custom-io

On Wed, Mar 9, 2016 at 7:57 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Hi Joseph,
>
> yes, you can create your own PTransform in Java.
>
> For instance, something like:
>
>   static class MyPTransform extends
>       PTransform<PCollection<KV<String, String>>, PCollection<String>> {
>     @Override
>     public PCollection<String> apply(PCollection<KV<String, String>> data)
> {
>
>         PCollection... foo = data.apply(GroupByKey.create())....
>         PCollection... bar = foo.apply(ParDo.of(...));
>
>         return bar;
>    }}
>
> Regards
> JB
>
>
> On 03/09/2016 02:44 PM, Joseph Winston wrote:
>
>> Can I write a new PTransform in Java then?
>>
>> On Tue, 08 Mar 2016 22:11:13 GMT Chamikara Jayalath <[email protected]
>>> >
>>>
>> wrote:
>>
>>>
>>> Hi Joseph,
>>>
>>> Python SDK currently does not support creating new sources. Sources that
>>> are currently available are backed by Google Dataflow service.
>>> Theoretically it should be possible to get new sources working just for
>>> DirectPipelineRunner by hacking the SDK but this has not been tested
>>> properly.
>>>
>>> Please let us know if you run into any issues when using existing
>>> sources.
>>> We hope to add a framework for creating new sources soon.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>> On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <[email protected]>
>>> wrote:
>>>
>>> Adding Silviu, who can comment more.
>>>>
>>>> On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <
>>>> [email protected]>
>>>> wrote:
>>>>
>>>> I would like to use Apache Beam as my dataflow engine. Unfortunately
>>>>>
>>>> for
>>
>>> me, the input to the dataflow isn't a text file and the result of the
>>>>> pipeline is also not a text file. To see how difficult it would be to
>>>>> create the desired classes, I've subclassed ptransform.PTransform as
>>>>>
>>>> well
>>
>>> as iobase.Source and started on the read side of the problem.  I've
>>>>>
>>>> cloned
>>>>
>>>>> and installed
>>>>>
>>>> https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
>>>>
>>>>> on my VM and I am working with the most recent commit 3a56ce7.
>>>>>
>>>>> Next, I wrote the following code, which looks very close to the class
>>>>>
>>>> Read
>>>>
>>>>> in google/cloud/dataflow/io/iobase.py and TextFileSource in
>>>>> google/cloud/dataflow/io/iobase.py
>>>>>
>>>>> import argparse
>>>>> import logging
>>>>> import sys
>>>>>
>>>>> import google.cloud.dataflow as df
>>>>>
>>>>> from google.cloud.dataflow.io import iobase
>>>>>
>>>>> class DummyFileSource(iobase.Source):
>>>>>      """A source for a GCS or local dummy file.
>>>>>      """
>>>>>
>>>>>      def __init__(self, params):
>>>>>          self._params = params
>>>>>          return
>>>>>
>>>>>      @property
>>>>>      def format(self):
>>>>>          """Source format name required for remote execution."""
>>>>>          return 'binary'
>>>>>
>>>>> from google.cloud.dataflow import pvalue
>>>>> from google.cloud.dataflow.transforms import core
>>>>> from google.cloud.dataflow.transforms import ptransform
>>>>>
>>>>> class DummyRead(ptransform.PTransform):
>>>>>      """A transform that reads a PCollection."""
>>>>>
>>>>>      def __init__(self, *args, **kwargs):
>>>>>          """Initializes a DummyRead transform.
>>>>>
>>>>>          Args:
>>>>>          *args: A tuple of position arguments.
>>>>>          **kwargs: A dictionary of keyword arguments.
>>>>>
>>>>>          The *args, **kwargs are expected to be (label, source) or
>>>>>
>>>> (source).
>>>>
>>>>>          """
>>>>>
>>>>>          label, source = self.parse_label_and_arg(args, kwargs,
>>>>>
>>>> 'source')
>>
>>>          super(DummyRead, self).__init__(label)
>>>>>          self.source = source
>>>>>          return
>>>>>
>>>>>      def apply(self, pbegin):
>>>>>          assert isinstance(pbegin, pvalue.PBegin)
>>>>>          self.pipeline = pbegin.pipeline
>>>>>          return pvalue.PCollection(pipeline=self.pipeline,
>>>>>
>>>> transform=self)
>>
>>>
>>>>>      def get_windowing(self, unused_inputs):
>>>>>          return core.Windowing(window.GlobalWindows())
>>>>>
>>>>> def main(argv = None):
>>>>>      if argv is None:
>>>>>          argv = sys.argv
>>>>>
>>>>>      DummyFileSource('vat')
>>>>>      parser = argparse.ArgumentParser()
>>>>>      parser.add_argument('--baseURI',
>>>>>                          dest='baseURI',
>>>>>                          default='http://localhost:3000',
>>>>>                          help='Base URI.')
>>>>>
>>>>>      parser.add_argument('--fakeData',
>>>>>                          dest='fakeData',
>>>>>                          default='fakeData',
>>>>>                          help='Fake data')
>>>>>      known_args, pipeline_args = parser.parse_known_args(argv)
>>>>>
>>>>>      p = df.Pipeline(argv=pipeline_args)
>>>>>
>>>>>      params = {}
>>>>>      postStackDummy = p | DummyRead('read',
>>>>>                                     DummyFileSource(params))
>>>>>
>>>>>      #
>>>>>      # Actually run the pipeline (all operations above are deferred).
>>>>>      #
>>>>>
>>>>>      p.run()
>>>>>
>>>>>      return
>>>>>
>>>>> if __name__ == '__main__':
>>>>>      logging.getLogger().setLevel(logging.INFO)
>>>>>      sys.exit(main(sys.argv) or 0)
>>>>>
>>>>> When I run this program, the following traceback is produced:
>>>>>
>>>>> Traceback (most recent call last):
>>>>>    File "sample.py", line 85, in <module>
>>>>>      sys.exit(main(sys.argv) or 0)
>>>>>    File "sample.py", line 79, in main
>>>>>      p.run()
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
>>
>>> line 135, in run
>>>>>      return self.runner.run(self)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
>>
>>> line 81, in run
>>>>>      pipeline.visit(RunVisitor(self), node=node)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
>>
>>> line 168, in visit
>>>>>      start_transform.visit(visitor, self, visited)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
>>
>>> line 376, in visit
>>>>>      part.visit(visitor, pipeline, visited)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
>>
>>> line 379, in visit
>>>>>      visitor.visit_transform(self)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
>>
>>> line 79, in visit_transform
>>>>>      self.runner.run_transform(transform_node)
>>>>>    File
>>>>>
>>>>>
>>>>>
>>>>>
>> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
>>
>>> line 155, in run_transform
>>>>>      transform_node.transform, self))
>>>>> NotImplementedError: Execution of [<DummyRead(PTransform)
>>>>>
>>>> label=[read]>]
>>
>>> not implemented in runner
>>>>> <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner
>>>>>
>>>> object
>>
>>> at
>>>>
>>>>> 0x7f42af19e750>.
>>>>>
>>>>> What I am having a hard time seeing is the association of the label
>>>>> 'read'
>>>>> in the constructor of the class DummyRead and where the method needs
>>>>>
>>>> to
>>
>>> be
>>>>> implemented.  What am I missing?
>>>>>
>>>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to