> For Samza Runner, we are looking to leverage java portable mode to
achieve “split deployment” where runner is independently packaged w/o user
code and user code should only exist in the submission/worker process. I
believe this is supported by portable mode and therefore we would prefer to
use LOOPBACK (for testing) and DOCKER (for production) mode.

Makes sense.

> Is there a way to get BEAM-12227
<https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the
fastest way is to patch it ourselves?

Probably to patch it yourselves. I'd be happy to provide a review if you
need it though.

On Mon, Apr 26, 2021 at 10:47 AM Ke Wu <ke.wu...@gmail.com> wrote:

> That makes sense.
>
> For Samza Runner, we are looking to leverage java portable mode to achieve
> “split deployment” where runner is independently packaged w/o user code and
> user code should only exist in the submission/worker process. I believe
> this is supported by portable mode and therefore we would prefer to use
> LOOPBACK (for testing) and DOCKER (for production) mode.
>
> Is there a way to get BEAM-12227
> <https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the
> fastest way is to patch it ourselves?
>
> Best,
> Ke
>
>
> On Apr 26, 2021, at 10:17 AM, Kyle Weaver <kcwea...@google.com> wrote:
>
> The reason is the Flink and Spark runners are written in Java. So when the
> runner needs to execute user code written in Java, an EMBEDDED environment
> can be started in the runner. Whereas the runner cannot natively execute
> Python code, so it needs to call out to an external process. In the case of
> LOOPBACK, that external process is started by the Python client process
> that submitted the job in the first place.
>
> On Mon, Apr 26, 2021 at 9:57 AM Ke Wu <ke.wu...@gmail.com> wrote:
>
>> Thank you Kyle, I have created BEAM-12227
>> <https://issues.apache.org/jira/browse/BEAM-12227> to track the
>> unimplemented exception.
>>
>> Is there any specific reason that Java tests are using EMBEDDED mode
>> while python usually in LOOPBACK mode?
>>
>> Best,
>> Ke
>>
>> On Apr 23, 2021, at 4:01 PM, Kyle Weaver <kcwea...@google.com> wrote:
>>
>> I couldn't find any existing ticket for this issue (you may be the first
>> to discover it). Feel free to create one with your findings. (FWIW I did
>> find a ticket for documenting portable Java pipelines [1]).
>>
>> For the Flink and Spark runners, we run most of our Java tests using
>> EMBEDDED mode. For portable Samza, you will likely want to use a similar
>> setup [2].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11062
>> [2]
>> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>>
>> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu <ke.wu...@gmail.com> wrote:
>>
>>> Thank you, Kyle, for the detailed answer.
>>>
>>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
>>> essential, especially for local testing as Samza Runner adopts portable
>>> mode and we are intended to run it with Java pipeline a lot.
>>>
>>> In addition, I noticed that this issue does not happen every time
>>> LOOPBACK is used, for example:
>>>
>>> Pipeline p = Pipeline.create(options);
>>>
>>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), 
>>> KV.of("2", 3L), KV.of("3", 9L)))
>>>     .apply(Sum.longsPerKey())
>>>     .apply(MapElements.via(new PrintFn()));
>>>
>>> p.run().waitUntilFinish();
>>>
>>> Where PrintFn simply prints the result:
>>>
>>> public static class PrintFn extends SimpleFunction<KV<String, Long>, 
>>> String> {
>>>   @Override
>>>   public String apply(KV<String, Long> input) {
>>>     LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>>>     return input.getKey() + ": " + input.getValue();
>>>   }
>>> }
>>>
>>>
>>> This simple pipeline did work in Java LOOPBACK mode.
>>>
>>> Best,
>>> Ke
>>>
>>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com> wrote:
>>>
>>> Yes, we can expect to run java pipelines in portable mode. I'm guessing
>>> the method unimplemented exception is a bug, and we haven't caught it
>>> because (as far as I know) we don't test the Java loopback worker.
>>>
>>> As an alternative, you can try building the Java docker environment with
>>> "./gradlew :sdks:java:container:java8:docker" and then use
>>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
>>> be able to access the host filesystem [1].
>>>
>>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
>>> embedded environment assumes the dependencies are already present on the
>>> runner, which will not be the case unless you modify the job server to
>>> depend on the examples module.
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-5440
>>>
>>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am working on add portability support for Samza Runner and having
>>>> been playing around on the support in Flink and Spark runner recently.
>>>>
>>>> One thing I noticed is the lack of documentation on how to run a java
>>>> pipeline in a portable mode. Almost all document focuses on how to run a
>>>> python pipeline, which is understandable. I believe a java pipeline can be
>>>> executed in portable mode as well so I did some experiments but results are
>>>> not expected and would like to know if they are expected:
>>>>
>>>>
>>>> 1. Add portability module to example so PipelineOptionsFactory can
>>>> recognize PortableRunner:
>>>>
>>>> $ git diff
>>>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>>>> index 62f15ec24b..c9069d3f4f 100644
>>>> --- a/examples/java/build.gradle
>>>> +++ b/examples/java/build.gradle
>>>> @@ -59,6 +59,7 @@ dependencies {
>>>>    compile project(":sdks:java:extensions:google-cloud-platform-core")
>>>>    compile project(":sdks:java:io:google-cloud-platform")
>>>>    compile project(":sdks:java:io:kafka")
>>>> +  compile project(":runners:portability:java")
>>>>    compile project(":sdks:java:extensions:ml")
>>>>    compile library.java.avro
>>>>    compile library.java.bigdataoss_util
>>>>
>>>>
>>>> 2. Bring up the Job Server:
>>>>
>>>> Spark: ./gradlew :runners:spark:3:job-server:runShadow
>>>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>>>>
>>>> 3. Execute WordCount example:
>>>>
>>>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
>>>> -Dexec.args="--inputFile=README.md --output=/tmp/output
>>>> --runner=PortableRunner --jobEndpoint=localhost:8099
>>>> --defaultEnvironmentType=LOOPBACK"
>>>>
>>>>
>>>>
>>>> Neither Flink or Spark runner worked for WordCount because of
>>>>
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>>>> UNIMPLEMENTED: Method
>>>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
>>>> is unimplemented
>>>>         at
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>>>>         at
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>>>>         at
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>>>>         at
>>>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
>>>>         at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
>>>>         at
>>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
>>>>
>>>> Here are couple of questions:
>>>>
>>>> 1. Can we expect to run a java pipeline in portable mode?
>>>> 2. If Yes, is the above exception expected or did I do something wrong?
>>>> 2. Is there any pending work to make Java portability support on par
>>>> with Python?
>>>>
>>>> Best,
>>>> Ke
>>>>
>>>
>>>
>>
>

Reply via email to