[
https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15031850#comment-15031850
]
ASF GitHub Bot commented on FLINK-2522:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1412#discussion_r46149442
--- Diff:
flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by nikste on 8/12/15.
+ */
+public class ScalaShellRemoteStreamEnvironment extends
RemoteStreamEnvironment {
+ private static final Logger LOG =
LoggerFactory.getLogger(ScalaShellRemoteStreamEnvironment.class);
+ // reference to Scala Shell, for access to virtual directory
+ private FlinkILoop flinkILoop;
+ /**
+ * Creates a new RemoteStreamEnvironment that points to the master
+ * (JobManager) described by the given host name and port.
+ *
+ * @param host The host name or address of the master (JobManager),
where the
+ * program should be executed.
+ * @param port The port of the master (JobManager), where the program
should
+ * be executed.
+ * @param jarFiles The JAR files with code that needs to be shipped to
the
+ * cluster. If the program uses
user-defined functions,
+ * user-defined input formats, or any
libraries, those must be
+ */
+ public ScalaShellRemoteStreamEnvironment(String host, int port,
FlinkILoop flinkILoop, String... jarFiles) {
+ super(host, port, jarFiles);
+ this.flinkILoop = flinkILoop;
+ }
+ /**
+ * compiles jars from files in the shell virtual directory on the fly,
sends and executes it in the remote stream environment
+ *
+ * @return Result of the computation
+ * @throws ProgramInvocationException
+ */
+ @Override
+ public JobExecutionResult execute() throws Exception {
+ prepareJars();
+ return(super.execute());
+ }
+ /**
+ * prepares the user generated code from the shell to be shipped to
JobManager
+ * (i.e. save it into jarFiles of this object)
+ */
+ private void prepareJars() throws MalformedURLException {
+ String jarFile =
flinkILoop.writeFilesToDisk().getAbsolutePath();
+
+ // get "external jars, and add the shell command jar, pass to
executor
+ List<String> alljars = new ArrayList<String>();
+ // get external (library) jars
+ String[] extJars = this.flinkILoop.getExternalJars();
+
+ if(!ArrayUtils.isEmpty(extJars)) {
+ alljars.addAll(Arrays.asList(extJars));
+ }
+ // add shell commands
+ alljars.add(jarFile);
+ String[] alljarsArr = new String[alljars.size()];
+ alljarsArr = alljars.toArray(alljarsArr);
+ for (String jarF : alljarsArr) {
+ URL fileUrl = new URL("file://" + jarF);
+ System.out.println("sending:" + fileUrl);
+ try {
+ JobWithJars.checkJarFile(fileUrl);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Problem with jar
file " + fileUrl, e);
+ }
+ jarFiles.add(fileUrl);
+ }
+ }
+ /**
+ * compiles jars from files in the shell virtual directory on the fly,
sends and executes it in the remote stream environment
+ * @param jobName name of the job as string
+ * @return Result of the computation
+ * @throws ProgramInvocationException
+ */
+ @Override
+ public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
+ try {
+ prepareJars();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+
+ JobExecutionResult jer = null;
+ try {
+ jer = super.execute(jobName);
+ } catch (Exception e) {
+ e.printStackTrace();
--- End diff --
I think printing the stack trace only without the message is not helpful.
> Integrate Streaming Api into Flink-scala-shell
> ----------------------------------------------
>
> Key: FLINK-2522
> URL: https://issues.apache.org/jira/browse/FLINK-2522
> Project: Flink
> Issue Type: Improvement
> Components: Scala Shell
> Reporter: Nikolaas Steenbergen
> Assignee: Nikolaas Steenbergen
>
> startup scala shell with "-s" or "-streaming" flag to use the streaming api
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)