gonzojive commented on issue #21817:
URL: https://github.com/apache/beam/issues/21817#issuecomment-1160028986

   > Thank you for filing the issue!
   > 
   > From your configuration, you've got [6 threads in parallel per 
worker](https://gist.github.com/gonzojive/6a5e32dbc5693770cfd07624f8c55bee#file-flink-conf-yaml-L102)
   > 
   > The short term fix is to process fewer bundles simultaneously, so reducing 
that number. The SDK is largely expecting the Runner to handle how to schedule 
work and similar, so it doesn't have any ability to deny the runner's request 
for processing, other than failing the bundle.
   
   Reducing parallelism (using the --parallelism flag) worked. Thanks!
   
   The unfortunate side effect is processing slows down a lot since the earlier 
stages (which have no memory issues) don't get parallelized.
   
   > 
   > At present the SDK isn't aware at all about how much memory the system is 
using, as it's unclear how the runner, or the system can handle that.
   > 
   > After all, unless the downloaded files are being streamed straight to the 
output files in the same DoFn, they will have to be in memory for some time.
   > 
   > Is everything being executed on a single machine rather than a cluster? 
What does the pipeline look like? Separated into multiple DoFns? Any 
Aggregations?
   
   Single machine. 
   
   Basically, the pipeline downloads ~1000 files, each of which has key/value 
pairs with almost identical sets of keys. Values are accumulated for each key 
for all the files to obtain what would be a `map[key][]value` in Go. One record 
is output to a record-oriented file (Riegeli) for each key that contains the 
key and all of its values. There are ~10,000 keys (each 10 bytes) and 1000*50 
values per key, each value is maybe 10-50 bytes.
   
   See below for more information and a failed attempt to reproduce.
   
   > 
   > How big are each of these files? I'll note that short of streaming a 
download directly to a file output, there's going to be buffering at least to 
the size of the file in question.
   > 
   > I will note that the segment of the heap graph you've provided shows none 
of the places where allocations are occurring. 
   
   I tried to reproduce at https://github.com/gonzojive/beam-go-bazel-example 
but didn't experience exactly the same issues.
   
   Here is a table view of a similar memory problem. The memory explosion 
happens in the very last stage of the pipeline when trying to write to a 
sharded output file (Riegeli format):
   
   Flat | Flat% | Sum% | Cum | Cum% | Name | Inlined?
   -- | -- | -- | -- | -- | -- | --
   0 | 0.00% | 0.00% | 32475.50MB | 97.58% | 
google.golang.org/protobuf/proto.UnmarshalOptions.unmarshal |  
   0 | 0.00% | 0.00% | 32474.97MB | 97.57% | 
google.golang.org/protobuf/internal/impl.(*MessageInfo).unmarshalPointer |  
   0 | 0.00% | 0.00% | 32474.97MB | 97.57% | 
google.golang.org/protobuf/internal/impl.(*MessageInfo).unmarshal |  
   0 | 0.00% | 0.00% | 32469.47MB | 97.56% | 
google.golang.org/protobuf/internal/impl.consumeMessageSliceInfo |  
   0 | 0.00% | 0.00% | 30612.01MB | 91.98% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4 |  
   0 | 0.00% | 0.00% | 30612.01MB | 91.98% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction
 |  
   0 | 0.00% | 0.00% | 30606.51MB | 91.96% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic |  
   0 | 0.00% | 0.00% | 30606.51MB | 91.96% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute |  
   0 | 0.00% | 0.00% | 30605MB | 91.96% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process
 |  
   1.64MB | 0.00% | 0.00% | 28125.61MB | 84.51% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).makeReStream
 |  
   0 | 0.00% | 0.00% | 28123.98MB | 84.50% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.readStreamToBuffer 
|  
   0.50MB | 0.00% | 0.01% | 28123.98MB | 84.50% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*customDecoder).Decode
 |  
   0 | 0.00% | 0.01% | 28123.48MB | 84.50% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*customDecoder).DecodeTo
 |  
   28107.60MB | 84.45% | 84.46% | 28107.60MB | 84.45% | reflect.New |  
   0 | 0.00% | 84.46% | 27749.84MB | 83.38% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*decoderTTypex_TE).Call2x2
 |  
   0 | 0.00% | 84.46% | 27749.84MB | 83.38% | 
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*decoder2x2).Decode
 |  
   0 | 0.00% | 84.46% | 27749.84MB | 83.38% | 
github.com/apache/beam/sdks/v2/go/pkg/beam.protoDec |  
   0 | 0.00% | 84.46% | 27749.34MB | 83.38% | 
google.golang.org/protobuf/proto.UnmarshalOptions.Unmarshal |  
   0 | 0.00% | 84.46% | 19103.76MB | 57.40% | 
google.golang.org/protobuf/internal/impl.consumeMessageInfo |  
   0 | 0.00% | 84.46% | 2660.91MB | 7.99% | 
google.golang.org/protobuf/proto.UnmarshalOptions.UnmarshalState |  
   0 | 0.00% | 84.46% | 2660.91MB | 7.99% | 
google.golang.org/grpc/encoding/proto.codec.Unmarshal
   
   makeReStream looks like it is eating up a lot of memory. I think this is the 
function preparing data for the following DoFn:
   
   ```go
   type writeFileFn[T proto.Message] struct {
        Filename   string `json:"filename"`
        ShardCount int    `json:"shardCount"`
   }
   
   func (w *writeFileFn[T]) ProcessElement(ctx context.Context, shard int, 
protos func(*T) bool) error {
        fs, err := filesystem.New(ctx, w.Filename)
        if err != nil {
                return err
        }
        defer fs.Close()
   
        shardName := fmt.Sprintf("%05d-of-%05d", shard+1, w.ShardCount)
   
        fd, err := fs.OpenWrite(ctx, w.Filename+"-"+shardName)
        if err != nil {
                return err
        }
        buf := bufio.NewWriterSize(fd, 1000*1000*5) // use 5MB buffer
        recordWriter := riegeli.NewWriter(buf, nil)
   
        var elem T
        for protos(&elem) {
                if err := recordWriter.PutProto(elem); err != nil {
                        return fmt.Errorf("error writing proto to riegeli file: 
%w", err)
                }
        }
   
        if err := recordWriter.Flush(); err != nil {
                return fmt.Errorf("error flushing bytes to riegeli file: %w", 
err)
        }
   
        if err := buf.Flush(); err != nil {
                return fmt.Errorf("error flushing bytes to riegeli file: %w", 
err)
        }
        if err := fd.Close(); err != nil {
                return fmt.Errorf("error closing riegeli file: %w", err)
        }
        return nil
   }
   ```
   
   I'm not sure how the streams work for the `protos func(*T) bool` iterator 
(though I'll dig into your comments more at some point to find out). I'm 
guessing either the runner or the harness is loading too many elements into the 
iterator stream.
   
   > 
   > That said, here's some areas to look into depending on the pipeline. TBH 
as described, neither of these are likely to help.
   > 
   > As implemented, the SDK will buffer some number of elements per bundle 
being processed. See 
[datamgr.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L32)
 after that, that additional elements will not be accepted from the Runner 
until something has processed through. This happens using [standard channel 
blocking](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L454).
   > 
   > The other place where memory might "back up" is the [Combiner Lifting 
Cache](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/combine.go#L436)
 this currently use a map, and a fixed cap on eviction size. We would love to 
make that more memory aware, so that more or less memory pressure will evict 
elements and allow things to GC. A good mechanism for this hasn't been 
determined, as in general, there's value in keeping the cache as full as 
possible so that elements are combined before the shuffle.
   
   Very cool. Do you have any sort of debugging visualizer for the harness? 
It'd be interesting to see what bundles are active, how many elements there are 
in each, what is known about the size of the elements.
   
   Perhaps a structured log could be output and replayed using a visualizer.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to