[
https://issues.apache.org/jira/browse/BEAM-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989373#comment-16989373
]
Nishant Trivedi commented on BEAM-106:
--------------------------------------
Hello,
I'm new to dataflow and the job that I'm trying to write operates on objects
stored in GCS. The problem is that the objects can be stored at an arbitrary
depth like {{gs://bucket/a/b/c/1/<object1>, gs://bucket/a/b/c/1/<object2>,
gs://bucket/a/b/c/2/<object1>, gs://bucket/a/b/c/2/<object2> ...}}. I have to
collect all objects in a list before I can have the job operate on them. If I
want to list the objects as a pipeline step I think it would require
conditional steps and I don't know how to do it. Is there a way to do
conditional steps? Alternatively is there a native way of doing a directory
walk in beam. Also attaching a naive version of the {{walk}} below to make my
use case clear:
{code}
from os import path
from tensorflow.io import gfile
def walker(base_path):
dir_q = gfile.glob(path.join(base_path, "*"))
object_q = []
while len(dir_q) != 0:
current = dir_q.pop(0)
if gfile.isdir(current):
dir_q.extend(gfile.glob(path.join(current, "*")))
else:
object_q.append(current)
return object_q
{code}
I should also mentions that there are a couple of million objects so doing the
listing in a serial fashion like above is not very efficient.
> Native support for conditional iteration
> ----------------------------------------
>
> Key: BEAM-106
> URL: https://issues.apache.org/jira/browse/BEAM-106
> Project: Beam
> Issue Type: New Feature
> Components: sdk-ideas
> Reporter: Luke Cwik
> Priority: Major
>
> Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50
> There are a variety of use cases which would benefit from native support for
> conditional iteration.
> For instance,
> http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923
> asks about being able to write a loop like the following:
> {code}
> PCollection data = ...
> while(needsMoreWork(data)) {
> data = doAStep(data)
> }
> {code}
> If there are specific use cases please let us know the details. In the future
> we will use this issue to post progress updates.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)