On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <[email protected]> wrote: > > Hi Gleb, > > Regarding the future of io.Read: ideally things would go as follows > - All runners support SDF at feature parity with Read (mostly this is just > the Dataflow runner's liquid sharding and size estimation for bounded > sources, and backlog for unbounded sources, but I recall that a couple of > other runners also used size estimation) > - Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden to > add any new implementations to SDK, and users shouldn't use them either > (note: I believe it's already effectively forbidden to use them for cases > where a DoFn/SDF at the current level of support will be sufficient) > - People one by one rewrite existing Bounded/UnboundedSource based > PTransforms in the SDK to use SDFs instead > - Read.from() is rewritten to use a wrapper SDF over the given Source, and > explicit support for Read is deleted from runners > - In the next major version of Beam - presumably 3.0 - the Read transform > itself is deleted > > I don't know what's the current status of SDF/Read feature parity, maybe Luke > or Cham can comment. An alternative path is offered in > http://s.apache.org/sdf-via-source.
Python supports initial splitting for SDF of all sources on portable runners. Dataflow support for batch SDF is undergoing testing, not yet rolled out. Dataflow support for streaming SDF is awaiting portable state/timer support. > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <[email protected]> wrote: >> >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going away >> in favor of SDF, or we are always going to have both? >> >> I was looking into AvroIO.read and AvroIO.readAll, both of them use >> AvroSource. AvroIO.readAll is using SDF, and it's implemented with >> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at >> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it >> should be enough to have something like (KV<ReadableFile, OffsetRange>, >> OutputReceiver<T>), as we have discussed in this thread, and that should be >> fine for SMB as well. It would require duplicating code from AvroSource, but >> in the end, I don't see it as a problem if AvroSource is going away. >> >> I'm attaching a small diagram I put for myself to better understand the code. >> >> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> -> >> >> FileIO.matchAll :: PTransform<PCollection<String>, >> PCollection<MatchResult.Metadata>> >> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>, >> PCollection<ReadableFile>> >> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>, >> PCollection<T>> -> >> >> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>, >> PCollection<T>> -> >> >> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile, >> OffsetRange>>) (splittable do fn) >> >> Reshuffle.viaRandomKey() >> >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile, >> OffsetRange>, T>) where >> >> createSource :: String -> FileBasedSource<T> >> >> createSource = AvroSource >> >> >> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin, >> PCollection<T>> -> >> >> Read.Bounded.from(createSource) where >> >> createSource :: String -> FileBasedSource<T> >> >> createSource = AvroSource >> >> >> Gleb >> >> >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <[email protected]> wrote: >>> >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <[email protected]> wrote: >>> > >>> > From the peanut gallery, keeping a separate implementation for SMB seems >>> > fine. Dependencies are serious liabilities for both upstream and >>> > downstream. It seems like the reuse angle is generating extra work, and >>> > potentially making already-complex implementations more complex, instead >>> > of helping things. >>> >>> +1 >>> >>> To be clear, what I care about is that WriteFiles(X) and >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text, >>> TFRecord, ...}. In other words composability of the API (vs. manually >>> filling out the matrix). If WriteFiles and WriteSmbFiles find >>> opportunities for (easy, clean) implementation sharing, that'd be >>> nice, but not the primary goal. >>> >>> (Similarly for reading, though that's seem less obvious. Certainly >>> whatever T is useful for ReadSmb(T) could be useful for a >>> (non-liquid-shading) ReadAll(T) however.) >>> >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <[email protected]> wrote: >>> >> >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be >>> >> determined until the last finalize transform, which is again different >>> >> from the current SMB proposal (static number of buckets & shards). >>> >> I'll end up with more code specialized for SMB in order to generalize >>> >> existing sink code, which I think we all want to avoid. >>> >> >>> >> Seems the only option is duplicating some logic like temp file handling, >>> >> which is exactly what we did in the original PR. >>> >> I can reuse Compression & Sink<T> for file level writes but that seems >>> >> about the most I can reuse right now. >>> >> >>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <[email protected]> wrote: >>> >>> >>> >>> So I spent one afternoon trying some ideas for reusing the last few >>> >>> transforms WriteFiles. >>> >>> >>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, >>> >>> Iterable<UserT>>, FileResult<DestinationT>> >>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, >>> >>> PCollection<List<ResultT>>> >>> >>> => FinalizeTempFileBundles extends >>> >>> PTransform<PCollection<List<FileResult<DestinationT>>>, >>> >>> WriteFilesResult<DestinationT>> >>> >>> >>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> >>> >>> so I can use pre-compute SMB destination file names for the transforms. >>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's >>> >>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are >>> >>> private and easy to change/pull out. >>> >>> >>> >>> OTOH they are somewhat coupled with the package private >>> >>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk >>> >>> of temp file handing logic lives). Might be hard to decouple either >>> >>> modifying existing code or creating new transforms, unless if we >>> >>> re-write most of FileBasedSink from scratch. >>> >>> >>> >>> Let me know if I'm on the wrong track. >>> >>> >>> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files >>> >>> >>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath >>> >>> <[email protected]> wrote: >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <[email protected]> >>> >>>> wrote: >>> >>>>> >>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov >>> >>>>> <[email protected]> wrote: >>> >>>>> > >>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw >>> >>>>> > <[email protected]> wrote: >>> >>>>> >> >>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <[email protected]> >>> >>>>> >> wrote: >>> >>>>> >> > >>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it >>> >>>>> >> > and see what needs to be done. >>> >>>>> >> > >>> >>>>> >> > Eugene pointed out that we shouldn't build on >>> >>>>> >> > FileBased{Source,Sink}. So for writes I'll probably build on top >>> >>>>> >> > of WriteFiles. >>> >>>>> >> >>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right? >>> >>>>> >> >>> >>>>> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 >>> >>>>> > >>> >>>>> > Yeah if possible, parameterize FileIO.Sink. >>> >>>>> > I would recommend against building on top of WriteFiles either. >>> >>>>> > FileIO being implemented on top of WriteFiles was supposed to be a >>> >>>>> > temporary measure - the longer-term plan was to rewrite it from >>> >>>>> > scratch (albeit with a similar structure) and throw away WriteFiles. >>> >>>>> > If possible, I would recommend to pursue this path: if there are >>> >>>>> > parts of WriteFiles you want to reuse, I would recommend to >>> >>>>> > implement them as new transforms, not at all tied to FileBasedSink >>> >>>>> > (but ok if tied to FileIO.Sink), with the goal in mind that FileIO >>> >>>>> > could be rewritten on top of these new transforms, or maybe parts >>> >>>>> > of WriteFiles could be swapped out for them incrementally. >>> >>>>> >>> >>>>> Thanks for the feedback. There's a lot that was done, but looking at >>> >>>>> the code it feels like there's a lot that was not yet done either, and >>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not finding >>> >>>>> the right docs). >>> >>>> >>> >>>> >>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for >>> >>>> updating source interfaces, but I prefer not significantly modifying >>> >>>> existing IO transforms to suite the SMB use-case. If there are >>> >>>> existing pieces of code that can be easily re-used that is fine, but >>> >>>> existing sources/sinks are designed to perform a PCollection -> file >>> >>>> transformation and vice versa with (usually) runner determined >>> >>>> sharding. Things specific to SMB such as sharding restrictions, >>> >>>> writing metadata to a separate file, reading multiple files from the >>> >>>> same abstraction, does not sound like features that should be included >>> >>>> in our usual file read/write transforms. >>> >>>> >>> >>>>> >>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered >>> >>>>> >> > elements across files within a bucket and TBH I'm not even sure >>> >>>>> >> > where to start. >>> >>>>> >> >>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile -> >>> >>>>> >> Iterable<T>. There are existing >>> >>>>> >> PTransform<PCollection<ReadableFile>, >>> >>>>> >> PCollection<T>> but such an interface is insufficient to extract >>> >>>>> >> ordered records per shard. It seems the only concrete >>> >>>>> >> implementations >>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but >>> >>>>> >> there's no >>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and >>> >>>>> >> cumbersome to call (given the reflection machinery involved in >>> >>>>> >> invoking DoFns). >>> >>>>> > >>> >>>>> > Seems easiest to just define a new regular Java interface for this. >>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something >>> >>>>> > analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends >>> >>>>> > on how much control over iteration you need. >>> >>>>> >>> >>>>> For this application, one wants to iterate over several files in >>> >>>>> parallel. The downside of a new interface is that it shares almost >>> >>>>> nothing with the "normal" sources (e.g. when features (or >>> >>>>> optimizations) get added to one, they won't get added to the other). >>> >>>> >>> >>>> >>> >>>>> >>> >>>>> >>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as Java >>> >>>>> > interfaces per se. If you need DoFn machinery in this interface >>> >>>>> > (e.g. side inputs), use Contextful - s.apache.org/context-fn. >>> >>>>> >>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is >>> >>>>> to build new DoFns out of others (or, really, use them in any context >>> >>>>> other than as an argument to ParDo). >>> >>>>> >>> >>>>> >> > I'll file separate PRs for core changes needed for discussion. >>> >>>>> >> > WDYT? >>> >>>>> >> >>> >>>>> >> Sounds good. >>> >>>> >>> >>>> >>> >>>> +1 >>> >>>> >>> >>>>> >>> >>>>> >> >>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw >>> >>>>> >> > <[email protected]> wrote: >>> >>>>> >> >> >>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li >>> >>>>> >> >> <[email protected]> wrote: >>> >>>>> >> >> > >>> >>>>> >> >> > Forking this thread to discuss action items regarding the >>> >>>>> >> >> > change. We can keep technical discussion in the original >>> >>>>> >> >> > thread. >>> >>>>> >> >> > >>> >>>>> >> >> > Background: our SMB POC showed promising performance & cost >>> >>>>> >> >> > saving improvements and we'd like to adopt it for production >>> >>>>> >> >> > soon (by EOY). We want to contribute it to Beam so it's >>> >>>>> >> >> > better generalized and maintained. We also want to avoid >>> >>>>> >> >> > divergence between our internal version and the PR while it's >>> >>>>> >> >> > in progress, specifically any breaking change in the produced >>> >>>>> >> >> > SMB data. >>> >>>>> >> >> >>> >>>>> >> >> All good goals. >>> >>>>> >> >> >>> >>>>> >> >> > To achieve that I'd like to propose a few action items. >>> >>>>> >> >> > >>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key >>> >>>>> >> >> > handling, bucket file and metadata format, etc., anything >>> >>>>> >> >> > that affect produced SMB data. >>> >>>>> >> >> > 2. Revise the existing PR according to #1 >>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, >>> >>>>> >> >> > Compression, etc., but keep the existing file level >>> >>>>> >> >> > abstraction >>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark >>> >>>>> >> >> > clearly as @experimental >>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, >>> >>>>> >> >> > GroupByKeyAndSortValues, FileIO generalization, key URN, etc. >>> >>>>> >> >> > >>> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1 >>> >>>>> >> >> > guarantees that production data produced today are usable >>> >>>>> >> >> > when #5 lands on master. #4 also gives early adopters a >>> >>>>> >> >> > chance to give feedback. >>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a >>> >>>>> >> >> > couple of big PRs to achieve, which we can keep iterating on. >>> >>>>> >> >> > >>> >>>>> >> >> > What are your thoughts on this? >>> >>>>> >> >> >>> >>>>> >> >> I would like to see some resolution on the FileIO abstractions >>> >>>>> >> >> before >>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that would >>> >>>>> >> >> mostly >>> >>>>> >> >> already work, so it's a matter of coming up with an analogous >>> >>>>> >> >> Source >>> >>>>> >> >> interface.) Specifically I would not want to merge a set of per >>> >>>>> >> >> file >>> >>>>> >> >> type smb IOs without a path forward to this or the >>> >>>>> >> >> determination that >>> >>>>> >> >> it's not possible/desirable. >> >> >> >> -- >> Cheers, >> Gleb
