Hi All,
I am working on add portability support for Samza Runner and having been
playing around on the support in Flink and Spark runner recently.
One thing I noticed is the lack of documentation on how to run a java pipeline
in a portable mode. Almost all document focuses on how to run a python
pipeline, which is understandable. I believe a java pipeline can be executed in
portable mode as well so I did some experiments but results are not expected
and would like to know if they are expected:
1. Add portability module to example so PipelineOptionsFactory can recognize
PortableRunner:
$ git diff
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 62f15ec24b..c9069d3f4f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -59,6 +59,7 @@ dependencies {
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile project(":sdks:java:io:google-cloud-platform")
compile project(":sdks:java:io:kafka")
+ compile project(":runners:portability:java")
compile project(":sdks:java:extensions:ml")
compile library.java.avro
compile library.java.bigdataoss_util
2. Bring up the Job Server:
Spark: ./gradlew :runners:spark:3:job-server:runShadow
Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
3. Execute WordCount example:
./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
-Dexec.args="--inputFile=README.md --output=/tmp/output --runner=PortableRunner
--jobEndpoint=localhost:8099 --defaultEnvironmentType=LOOPBACK"
Neither Flink or Spark runner worked for WordCount because of
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is
unimplemented
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
at
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
at
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
Here are couple of questions:
1. Can we expect to run a java pipeline in portable mode?
2. If Yes, is the above exception expected or did I do something wrong?
2. Is there any pending work to make Java portability support on par with
Python?
Best,
Ke