[
https://issues.apache.org/jira/browse/BEAM-7967?focusedWorklogId=307489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307489
]
ASF GitHub Bot logged work on BEAM-7967:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Sep/19 22:18
Start Date: 05/Sep/19 22:18
Worklog Time Spent: 10m
Work Description: ibzib commented on pull request #9408: [BEAM-7967]
Execute portable Flink application jar
URL: https://github.com/apache/beam/pull/9408#discussion_r321506835
##########
File path:
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
##########
@@ -51,4 +83,102 @@
ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json";
static final String PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json";
static final String PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH +
"/pipeline-options.json";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PortablePipelineJarCreator.class);
+
+ private static InputStream getResourceFromClassPath(String resourcePath)
throws IOException {
+ InputStream inputStream =
PortablePipelineJarUtils.class.getResourceAsStream(resourcePath);
+ if (inputStream == null) {
+ throw new FileNotFoundException(
+ String.format("Resource %s not found on classpath.", resourcePath));
+ }
+ return inputStream;
+ }
+
+ /** Populates {@code builder} using the JSON resource specified by {@code
resourcePath}. */
+ private static void parseJsonResource(String resourcePath, Builder builder)
throws IOException {
+ try (InputStream inputStream = getResourceFromClassPath(resourcePath)) {
+ String contents = new String(ByteStreams.toByteArray(inputStream),
StandardCharsets.UTF_8);
+ JsonFormat.parser().merge(contents, builder);
+ }
+ }
+
+ public static Pipeline getPipelineFromClasspath() throws IOException {
+ Pipeline.Builder builder = Pipeline.newBuilder();
+ parseJsonResource("/" + PIPELINE_PATH, builder);
+ return builder.build();
+ }
+
+ public static Struct getPipelineOptionsFromClasspath() throws IOException {
+ Struct.Builder builder = Struct.newBuilder();
+ parseJsonResource("/" + PIPELINE_OPTIONS_PATH, builder);
+ return builder.build();
+ }
+
+ public static ProxyManifest getArtifactManifestFromClassPath() throws
IOException {
+ ProxyManifest.Builder builder = ProxyManifest.newBuilder();
+ parseJsonResource("/" + ARTIFACT_MANIFEST_PATH, builder);
+ return builder.build();
+ }
+
+ /** Writes artifacts listed in {@code proxyManifest}. */
+ public static String stageArtifacts(
+ ProxyManifest proxyManifest,
+ PipelineOptions options,
+ String invocationId,
+ String artifactStagingPath)
+ throws Exception {
+ Collection<StagedFile> filesToStage =
+ prepareArtifactsForStaging(proxyManifest, options, invocationId);
+ try (GrpcFnServer artifactServer =
+ GrpcFnServer.allocatePortAndCreateFor(
+ new BeamFileSystemArtifactStagingService(),
InProcessServerFactory.create())) {
+ ManagedChannel grpcChannel =
+ InProcessManagedChannelFactory.create()
+ .forDescriptor(artifactServer.getApiServiceDescriptor());
+ ArtifactServiceStager stager =
ArtifactServiceStager.overChannel(grpcChannel);
+ String stagingSessionToken =
+ BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+ invocationId, artifactStagingPath);
+ String retrievalToken = stager.stage(stagingSessionToken, filesToStage);
+ // Clean up.
+ for (StagedFile file : filesToStage) {
+ if (!file.getFile().delete()) {
+ LOG.warn("Failed to delete file {}", file.getFile());
+ }
+ }
+ grpcChannel.shutdown();
+ return retrievalToken;
+ }
+ }
+
+ /**
+ * Artifacts are expected to exist as resources on the classpath, located
using {@code
+ * proxyManifest}. Write them to tmp files so they can be staged.
+ */
+ private static Collection<StagedFile> prepareArtifactsForStaging(
+ ProxyManifest proxyManifest, PipelineOptions options, String
invocationId)
+ throws IOException {
+ List<StagedFile> filesToStage = new ArrayList<>();
+ Path outputFolderPath =
+ Paths.get(
+ MoreObjects.firstNonNull(
+ options.getTempLocation(),
System.getProperty("java.io.tmpdir")),
+ invocationId);
+ if (!outputFolderPath.toFile().mkdir()) {
+ throw new IOException("Failed to create folder " + outputFolderPath);
+ }
+ for (Location location : proxyManifest.getLocationList()) {
+ try (InputStream inputStream =
getResourceFromClassPath(location.getUri())) {
Review comment:
As far as I know, the jar's contents only exist as class path resources
(streams), not regular files, and the artifact staging code expects that we are
staging regular files.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 307489)
Time Spent: 2h 50m (was: 2h 40m)
> Execute portable Flink application jar
> --------------------------------------
>
> Key: BEAM-7967
> URL: https://issues.apache.org/jira/browse/BEAM-7967
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink
> Reporter: Kyle Weaver
> Assignee: Kyle Weaver
> Priority: Major
> Labels: portability-flink
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> [https://docs.google.com/document/d/1kj_9JWxGWOmSGeZ5hbLVDXSTv-zBrx4kQRqOq85RYD4/edit#heading=h.oes73844vmhl]
--
This message was sent by Atlassian Jira
(v8.3.2#803003)