The reasoning unbounded threadpool is explained as: /* The SDK requires an unbounded thread pool because a step may create X writers * each requiring their own thread to perform the writes otherwise a writer may * block causing deadlock for the step because the writers buffer is full. * Also, the MapTaskExecutor launches the steps in reverse order and completes * them in forward order thus requiring enough threads so that each step's writers * can be active.
*/ https://github.com/apache/beam/blob/17c2da6d981cae9f233aea1e2d6d64259362dd73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L133-L138 On Thu, Nov 8, 2018 at 11:41 PM Dan Halperin <dhalp...@apache.org> wrote: > >> On Thu, Nov 8, 2018 at 2:12 PM Udi Meiri <eh...@google.com> wrote: >> >>> Both options risk delaying worker shutdown if the executor's shutdown() >>> hasn't been called, which is I guess why the executor in GcsOptions.java >>> creates daemon threads. >>> >> > My guess (and it really is a guess at this point) is that this was a fix > for DirectRunner issues - want that to exit quickly! > > >> >>> On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Not certain, it looks like we should have been caching the executor >>>> within the GcsUtil as a static instance instead of creating one each time. >>>> Could have been missed during code review / slow code changes over time. >>>> GcsUtil is not well "loved". >>>> >>>> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote: >>>> >>>>> HI, >>>>> I've identified a memory leak when GcsUtil.java instantiates a >>>>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018). >>>>> The code uses the getExitingExecutorService >>>>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> >>>>> wrapper, >>>>> which leaks memory. The question is, why is that wrapper necessary >>>>> if executor.shutdown(); is later unconditionally called? >>>>> >>>>
smime.p7s
Description: S/MIME Cryptographic Signature