Sure thing Jeremy,

Generally, the workflow we do is that every one of our jobs takes a
--consumerMode and/or --producerMode option and we pass those options to a
Read and Write PTransform which wraps standard IO PTransforms and calls the
correct one in expand based on the option's value.

I have a simplified example in slide 26 of my deck from my talk at the most
recent Beam Summit <https://www.youtube.com/watch?v=lvAdyEm9chI>, here:
https://github.com/x/slides/blob/master/beam-summit-2023/Apache%20Beam%20Summit%20-%20Avro%20and%20Beam%20Schemas.pdf

And I have an example of the current version of the code we use at my
company, Oden, in a gist here:
https://gist.github.com/x/948ac95b768671d342cc3856a3d7c681

The main use-case for us is that all of our dataflow jobs run in both a
Streaming (normal) and Batch (recovery) mode.


On Aug 14, 2023 at 3:09:51 PM, Jeremy Bloom <jeremybl...@gmail.com> wrote:

> Thanks. Is there a github link to Devon's code?
>
> On Mon, Aug 14, 2023 at 8:49 AM John Casey <theotherj...@google.com>
> wrote:
>
>> I believe Devon Peticolas wrote a similar tool to create an IO that wrote
>> to configurable sinks that might fit your use case
>>
>> On Sat, Aug 12, 2023 at 12:18 PM Bruno Volpato via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi Jeremy,
>>>
>>> Apparently you are trying to use Beam's DirectRunner
>>> <https://beam.apache.org/documentation/runners/direct/>, which is
>>> mostly focused on small pipelines / testing purposes.
>>> Even if it runs in the JVM, there are protections in place to make sure
>>> your pipeline will be able to be distributed correctly when choosing a
>>> production-ready runner (e.g., Dataflow, Spark, Flink), from the link above:
>>>
>>> - enforcing immutability of elements
>>> - enforcing encodability of elements
>>>
>>> There are ways to disable those checks (--enforceEncodability=false,
>>> --enforceImmutability=false), but to make sure you take the best out of
>>> Beam and can run the pipeline in one of the runners in the future, I
>>> believe the best way would be to write to a file, and read it back in the
>>> GUI application (for the sink part).
>>>
>>> For the source part, you may want to use Create
>>> <https://beam.apache.org/documentation/transforms/java/other/create/>
>>> to create a PCollection with specific elements for the in-memory scenario.
>>>
>>> If you are getting exceptions for supported scenarios that you've
>>> mentioned, there are a few things -- for example, if you are using lambda,
>>> sometimes Java will try to Serialize the entire instance that holds members
>>> being used. Creating your own DoFn classes and passing the Serializables
>>> that what you need to use may resolve.
>>>
>>>
>>> Best,
>>> Bruno
>>>
>>>
>>>
>>>
>>> On Sat, Aug 12, 2023 at 11:34 AM Jeremy Bloom <jeremybl...@gmail.com>
>>> wrote:
>>>
>>>> Hello-
>>>> I am fairly new to Beam but have been working with Apache Spark for a
>>>> number of years. The application I am developing uses a data pipeline to
>>>> ingest JSON with a particular schema, uses it to prepare data for a service
>>>> that I do not control (a mathematical optimization solver), runs the
>>>> application and recovers its results, and then publishes the results in
>>>> JSON (same schema).  Although I work in Java, colleagues of mine are
>>>> implementing in Python. This is an open-source, non-commercial project.
>>>>
>>>> The application has three kinds of IO sources/sinks: file system files
>>>> (using Windows now, but Unix in the future), URL, and in-memory (string,
>>>> byte buffer, etc). The last is primarily used for debugging, displayed in a
>>>> JTextArea.
>>>>
>>>> I have not found a Beam IO connector that handles all three data
>>>> sources/sinks, particularly the in-memory sink. I have tried adapting
>>>> FileIO and TextIO, however, I continually run up against objects that are
>>>> not serializable, particularly Java OutputStream and its subclasses. I have
>>>> looked at the code for FileIO and TextIO as well as several other custom IO
>>>> implementations, but none of them addresses this particular bug.
>>>>
>>>> The CSVSink example in the FileIO Javadoc uses a PrintWriter, which is
>>>> not serializable; when I tried the same thing, I got a not-serializable
>>>> exception. How does this example actually avoid this error? In the code for
>>>> TextIO.Sink, the PrintWriter field is marked transient, meaning that it is
>>>> not serialized, but again, when I tried the same thing, I got an exception.
>>>>
>>>> Please explain, in particular, how to write a Sink that avoids the not
>>>> serializable exception. In general, please explain how I can use a Beam IO
>>>> connector for the three kinds of data sources/sinks I want to use (file
>>>> system, url, and in-memory).
>>>>
>>>> After the frustrations I had with Spark, I have high hopes for Beam.
>>>> This issue is a blocker for me.
>>>>
>>>> Thank you.
>>>> Jeremy Bloom
>>>>
>>>

Reply via email to