[ 
https://issues.apache.org/jira/browse/BEAM-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989373#comment-16989373
 ] 

Nishant Trivedi commented on BEAM-106:
--------------------------------------

Hello,

I'm new to dataflow and the job that I'm trying to write operates on objects 
stored in GCS. The problem is that the objects can be stored at an arbitrary 
depth like {{gs://bucket/a/b/c/1/<object1>, gs://bucket/a/b/c/1/<object2>, 
gs://bucket/a/b/c/2/<object1>, gs://bucket/a/b/c/2/<object2> ...}}. I have to 
collect all objects in a list before I can have the job operate on them. If I 
want to list the objects as a pipeline step I think it would require 
conditional steps and I don't know how to do it. Is there a way to do 
conditional steps? Alternatively is there a native way of doing a directory 
walk in beam. Also attaching a naive version of the {{walk}} below to make my 
use case clear:
{code}
from os import path
from tensorflow.io import gfile

def walker(base_path):
    dir_q = gfile.glob(path.join(base_path, "*"))
    object_q = []
    while len(dir_q) != 0:
        current = dir_q.pop(0)
        if gfile.isdir(current):
            dir_q.extend(gfile.glob(path.join(current, "*")))
        else:
            object_q.append(current)
    return object_q
{code}
I should also mentions that there are a couple of million objects so doing the 
listing in a serial fashion like above is not very efficient.

> Native support for conditional iteration
> ----------------------------------------
>
>                 Key: BEAM-106
>                 URL: https://issues.apache.org/jira/browse/BEAM-106
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Luke Cwik
>            Priority: Major
>
> Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50
> There are a variety of use cases which would benefit from native support for 
> conditional iteration.
> For instance, 
> http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923
>  asks about being able to write a loop like the following:
> {code}
> PCollection data  = ...
> while(needsMoreWork(data)) {
>   data = doAStep(data)
> }
> {code}
> If there are specific use cases please let us know the details. In the future 
> we will use this issue to post progress updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to