> For Samza Runner, we are looking to leverage java portable mode to achieve “split deployment” where runner is independently packaged w/o user code and user code should only exist in the submission/worker process. I believe this is supported by portable mode and therefore we would prefer to use LOOPBACK (for testing) and DOCKER (for production) mode.
Makes sense. > Is there a way to get BEAM-12227 <https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the fastest way is to patch it ourselves? Probably to patch it yourselves. I'd be happy to provide a review if you need it though. On Mon, Apr 26, 2021 at 10:47 AM Ke Wu <ke.wu...@gmail.com> wrote: > That makes sense. > > For Samza Runner, we are looking to leverage java portable mode to achieve > “split deployment” where runner is independently packaged w/o user code and > user code should only exist in the submission/worker process. I believe > this is supported by portable mode and therefore we would prefer to use > LOOPBACK (for testing) and DOCKER (for production) mode. > > Is there a way to get BEAM-12227 > <https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the > fastest way is to patch it ourselves? > > Best, > Ke > > > On Apr 26, 2021, at 10:17 AM, Kyle Weaver <kcwea...@google.com> wrote: > > The reason is the Flink and Spark runners are written in Java. So when the > runner needs to execute user code written in Java, an EMBEDDED environment > can be started in the runner. Whereas the runner cannot natively execute > Python code, so it needs to call out to an external process. In the case of > LOOPBACK, that external process is started by the Python client process > that submitted the job in the first place. > > On Mon, Apr 26, 2021 at 9:57 AM Ke Wu <ke.wu...@gmail.com> wrote: > >> Thank you Kyle, I have created BEAM-12227 >> <https://issues.apache.org/jira/browse/BEAM-12227> to track the >> unimplemented exception. >> >> Is there any specific reason that Java tests are using EMBEDDED mode >> while python usually in LOOPBACK mode? >> >> Best, >> Ke >> >> On Apr 23, 2021, at 4:01 PM, Kyle Weaver <kcwea...@google.com> wrote: >> >> I couldn't find any existing ticket for this issue (you may be the first >> to discover it). Feel free to create one with your findings. (FWIW I did >> find a ticket for documenting portable Java pipelines [1]). >> >> For the Flink and Spark runners, we run most of our Java tests using >> EMBEDDED mode. For portable Samza, you will likely want to use a similar >> setup [2]. >> >> [1] https://issues.apache.org/jira/browse/BEAM-11062 >> [2] >> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186 >> >> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu <ke.wu...@gmail.com> wrote: >> >>> Thank you, Kyle, for the detailed answer. >>> >>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be >>> essential, especially for local testing as Samza Runner adopts portable >>> mode and we are intended to run it with Java pipeline a lot. >>> >>> In addition, I noticed that this issue does not happen every time >>> LOOPBACK is used, for example: >>> >>> Pipeline p = Pipeline.create(options); >>> >>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), >>> KV.of("2", 3L), KV.of("3", 9L))) >>> .apply(Sum.longsPerKey()) >>> .apply(MapElements.via(new PrintFn())); >>> >>> p.run().waitUntilFinish(); >>> >>> Where PrintFn simply prints the result: >>> >>> public static class PrintFn extends SimpleFunction<KV<String, Long>, >>> String> { >>> @Override >>> public String apply(KV<String, Long> input) { >>> LOG.info("Key {}: value {}", input.getKey(), input.getValue()); >>> return input.getKey() + ": " + input.getValue(); >>> } >>> } >>> >>> >>> This simple pipeline did work in Java LOOPBACK mode. >>> >>> Best, >>> Ke >>> >>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com> wrote: >>> >>> Yes, we can expect to run java pipelines in portable mode. I'm guessing >>> the method unimplemented exception is a bug, and we haven't caught it >>> because (as far as I know) we don't test the Java loopback worker. >>> >>> As an alternative, you can try building the Java docker environment with >>> "./gradlew :sdks:java:container:java8:docker" and then use >>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't >>> be able to access the host filesystem [1]. >>> >>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the >>> embedded environment assumes the dependencies are already present on the >>> runner, which will not be the case unless you modify the job server to >>> depend on the examples module. >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-5440 >>> >>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I am working on add portability support for Samza Runner and having >>>> been playing around on the support in Flink and Spark runner recently. >>>> >>>> One thing I noticed is the lack of documentation on how to run a java >>>> pipeline in a portable mode. Almost all document focuses on how to run a >>>> python pipeline, which is understandable. I believe a java pipeline can be >>>> executed in portable mode as well so I did some experiments but results are >>>> not expected and would like to know if they are expected: >>>> >>>> >>>> 1. Add portability module to example so PipelineOptionsFactory can >>>> recognize PortableRunner: >>>> >>>> $ git diff >>>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle >>>> index 62f15ec24b..c9069d3f4f 100644 >>>> --- a/examples/java/build.gradle >>>> +++ b/examples/java/build.gradle >>>> @@ -59,6 +59,7 @@ dependencies { >>>> compile project(":sdks:java:extensions:google-cloud-platform-core") >>>> compile project(":sdks:java:io:google-cloud-platform") >>>> compile project(":sdks:java:io:kafka") >>>> + compile project(":runners:portability:java") >>>> compile project(":sdks:java:extensions:ml") >>>> compile library.java.avro >>>> compile library.java.bigdataoss_util >>>> >>>> >>>> 2. Bring up the Job Server: >>>> >>>> Spark: ./gradlew :runners:spark:3:job-server:runShadow >>>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow >>>> >>>> 3. Execute WordCount example: >>>> >>>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount >>>> -Dexec.args="--inputFile=README.md --output=/tmp/output >>>> --runner=PortableRunner --jobEndpoint=localhost:8099 >>>> --defaultEnvironmentType=LOOPBACK" >>>> >>>> >>>> >>>> Neither Flink or Spark runner worked for WordCount because of >>>> >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >>>> UNIMPLEMENTED: Method >>>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker >>>> is unimplemented >>>> at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240) >>>> at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221) >>>> at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140) >>>> at >>>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247) >>>> at >>>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319) >>>> at >>>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48) >>>> at >>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208) >>>> >>>> Here are couple of questions: >>>> >>>> 1. Can we expect to run a java pipeline in portable mode? >>>> 2. If Yes, is the above exception expected or did I do something wrong? >>>> 2. Is there any pending work to make Java portability support on par >>>> with Python? >>>> >>>> Best, >>>> Ke >>>> >>> >>> >> >