Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/573#discussion_r27822552
  
    --- Diff: 
flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
 ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.stormcompatibility.api;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.JobSubmissionResult;
    +import org.apache.flink.client.program.Client;
    +import org.apache.flink.client.program.JobWithJars;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
    +
    +import scala.Some;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.pattern.Patterns;
    +import akka.util.Timeout;
    +import backtype.storm.Config;
    +import backtype.storm.generated.AlreadyAliveException;
    +import backtype.storm.generated.ClusterSummary;
    +import backtype.storm.generated.InvalidTopologyException;
    +import backtype.storm.generated.KillOptions;
    +import backtype.storm.generated.NotAliveException;
    +import backtype.storm.generated.SubmitOptions;
    +import backtype.storm.utils.NimbusClient;
    +import backtype.storm.utils.Utils;
    +
    +
    +
    +
    +
    +/**
    + * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link 
backtype.storm.generated.Nimbus.Client
    + * Nimbus.Client} at once, to interact with Flink's JobManager instead of 
Storm's Nimbus.
    + * 
    + * @author mjsax
    + */
    +public class FlinkClient {
    +   /**
    +    * Configuration key to specify the location of the file 
storm-core-0.9.4.jar (for org.apache.storm).
    +    */
    +   public final static String FLINK_STROM_CORE_JAR_LOCATION = 
"flink.storm.jar-location.storm-core";
    +   /**
    +    * Configuration key to specify the location of the file 
json-simple-1.1.jar (for com.googlecode.json-simple).
    +    */
    +   public final static String FLINK_STROM_JSON_SIMPLE_JAR_LOCATION = 
"flink.storm.jar-location.json-simple";
    +   
    +   
    +   
    +   /**
    +    * The users specified configuration.
    +    */
    +   private final Map<?, ?> config;
    +   /**
    +    * The jobmanager's host name.
    +    */
    +   private final String jobManagerHost;
    +   /**
    +    * The jobmanager's rpc port.
    +    */
    +   private final int jobManagerPort;
    +   /**
    +    * The user specified timeout in milliseconds.
    +    */
    +   private final String timeout;
    +   /**
    +    * Maps the names of running topologies to the corresponding Flink 
{@link JobID}.
    +    */
    +   private final Map<String, JobID> topologyNameToJobId = new 
HashMap<String, JobID>();
    +   /**
    +    * Maps the names of running topologies to the corresponding {@link 
FlinkTopology}.
    +    */
    +   private final Map<String, FlinkTopology> topologies = new 
HashMap<String, FlinkTopology>();
    +   
    +   
    +   
    +   /*
    +    * The following methods are derived from 
"backtype.storm.utils.NimbusClient"
    +    */
    +   
    +   /**
    +    * Instantiates a new {@link FlinkClient} for the given configuration, 
host name, and port. If values for
    +    * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of 
the given configuration are ignored.
    +    * 
    +    * @param conf
    +    *            A configuration.
    +    * @param host
    +    *            The jobmanager's host name.
    +    * @param port
    +    *            The jobmanager's rpc port.
    +    */
    +   public FlinkClient(final Map<?, ?> conf, final String host, final int 
port) {
    +           this(conf, host, port, null);
    +   }
    +   
    +   /**
    +    * Instantiates a new {@link FlinkClient} for the given configuration, 
host name, and port. If values for
    +    * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of 
the given configuration are ignored.
    +    * 
    +    * @param conf
    +    *            A configuration.
    +    * @param host
    +    *            The jobmanager's host name.
    +    * @param port
    +    *            The jobmanager's rpc port.
    +    * @param timeout
    +    */
    +   public FlinkClient(final Map<?, ?> conf, final String host, final int 
port, final Integer timeout) {
    +           this.config = conf;
    +           this.jobManagerHost = host;
    +           this.jobManagerPort = port;
    +           if(timeout != null) {
    +                   this.timeout = timeout + " ms";
    +           } else {
    +                   this.timeout = null;
    +           }
    +   }
    +   
    +   
    +   
    +   /**
    +    * Returns a {@link FlinkClient} that uses the configured {@link 
Config#NIMBUS_HOST} and
    +    * {@link Config#NIMBUS_THRIFT_PORT} as JobManager address.
    +    * 
    +    * @param conf
    +    *            Configuration that contains the jobmanager's hostname and 
port.
    +    * 
    +    * @return A configured {@link FlinkClient}.
    +    */
    +   public static FlinkClient 
getConfiguredClient(@SuppressWarnings("rawtypes") final Map conf) {
    +           final String nimbusHost = (String)conf.get(Config.NIMBUS_HOST);
    +           final int nimbusPort = 
Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
    +           return new FlinkClient(conf, nimbusHost, nimbusPort);
    +   }
    +   
    +   
    +   
    +   /**
    +    * Return a reference to itself.
    +    * 
    +    * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link 
backtype.storm.generated.Nimubs.Client
    +    * Nimbus.Client}, at once.
    +    * 
    +    * @return A reference to itself.
    +    */
    +   public FlinkClient getClient() {
    +           return this;
    +   }
    +   
    +   public void close() { /* nothing to do */}
    +   
    +   
    +   /*
    +    * The following methods are derived from 
"backtype.storm.generated.Nimubs.Client"
    +    */
    +   
    +   /**
    +    * Parameter {@code uploadedJarLocation} is actually used to point to 
the local jar, because Flink does not support
    +    * uploading a jar file before hand. Jar files are always uploaded 
directly when a program is submitted.
    +    */
    +   public void submitTopology(final String name, final String 
uploadedJarLocation, final String jsonConf, final FlinkTopology topology)
    +           throws AlreadyAliveException, InvalidTopologyException {
    +           this.submitTopologyWithOpts(name, uploadedJarLocation, 
jsonConf, topology, null);
    +   }
    +   
    +   /**
    +    * Parameter {@code uploadedJarLocation} is actually used to point to 
the local jar, because Flink does not support
    +    * uploading a jar file before hand. Jar files are always uploaded 
directly when a program is submitted.
    +    */
    +   @SuppressWarnings("unused")
    +   public void submitTopologyWithOpts(final String name, final String 
uploadedJarLocation, final String jsonConf, final FlinkTopology topology, final 
SubmitOptions options)
    +           throws AlreadyAliveException, InvalidTopologyException {
    +           
    +           // TODO improve by checking job manager, if running
    +           if(this.topologyNameToJobId.containsKey(name)) {
    +                   throw new AlreadyAliveException();
    +           }
    +           
    +           // TODO refactor and combine shared code (copied from 
RemoveStreamEnvironment)
    +           try {
    +                   final JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(name);
    +                   
    +                   // TODO improve error handling for missing config 
values and uploadedJarLocation==null
    +                   final String stormCoreJar = 
(String)this.config.get(FLINK_STROM_CORE_JAR_LOCATION);
    +                   final String jsonSimpleJar = 
(String)this.config.get(FLINK_STROM_JSON_SIMPLE_JAR_LOCATION);
    +                   final String[] jarFileNames = new String[] 
{stormCoreJar, jsonSimpleJar, uploadedJarLocation};
    +                   
    +                   final List<File> jarFiles = new ArrayList<File>();
    +                   for(int i = 0; i < jarFileNames.length; i++) {
    +                           final File file = new File(jarFileNames[i]);
    +                           try {
    +                                   JobWithJars.checkJarFile(file);
    +                           } catch(final IOException e) {
    +                                   throw new RuntimeException("Problem 
with jar file " + file.getAbsolutePath(), e);
    +                           }
    +                           jarFiles.add(file);
    +                   }
    +                   
    +                   for(final File file : jarFiles) {
    +                           jobGraph.addJar(new 
Path(file.getAbsolutePath()));
    +                   }
    +                   
    +                   final Configuration configuration = 
jobGraph.getJobConfiguration();
    +                   final Client client = new Client(new 
InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
    +                           configuration, 
JobWithJars.buildUserCodeClassLoader(jarFiles, 
JobWithJars.class.getClassLoader()), -1);
    +                   
    +                   try {
    +                           final JobSubmissionResult result = 
client.run(jobGraph, false);
    +                           this.topologyNameToJobId.put(name, 
result.getJobID());
    +                           this.topologies.put(name, topology);
    +                   } catch(final ProgramInvocationException e) {
    +                           throw new RuntimeException("Cannot execute job 
due to ProgramInvocationException", e);
    +                   }
    +                   
    +           } catch(final Exception e) {
    +                   e.printStackTrace();
    +           }
    +   }
    +   
    +   public void killTopology(final String name) throws NotAliveException {
    +           this.killTopologyWithOpts(name, null);
    +   }
    +   
    +   @SuppressWarnings("unused")
    +   public void killTopologyWithOpts(final String name, final KillOptions 
options) throws NotAliveException {
    +           final Configuration configuration = 
GlobalConfiguration.getConfiguration();
    +           if(this.timeout != null) {
    +                   
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
    +           }
    +           final FiniteDuration askTimeout = 
AkkaUtils.getTimeout(configuration);
    +           
    +           ActorSystem actorSystem;
    +           try {
    +                   final scala.Tuple2<String, Object> systemEndpoint = new 
scala.Tuple2<String, Object>("", new Integer(0));
    +                   actorSystem = 
AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, 
Object>>(
    +                           systemEndpoint));
    +           } catch(final Exception e) {
    +                   throw new RuntimeException("Could not start actor 
system to communicate with JobManager", e);
    +           }
    +           
    +           ActorRef jobManager;
    +           try {
    +                   jobManager = 
JobManager.getJobManagerRemoteReference(new 
InetSocketAddress(this.jobManagerHost,
    +                           this.jobManagerPort), actorSystem, 
AkkaUtils.getLookupTimeout(configuration));
    +                   
    +                   if(options != null) {
    +                           try {
    +                                   Thread.sleep(1000 * 
options.get_wait_secs());
    +                           } catch(final InterruptedException e) {
    +                                   e.printStackTrace();
    +                           }
    +                   }
    +                   
    +                   final Future<Object> response = 
Patterns.ask(jobManager, new CancelJob(this.topologyNameToJobId.get(name)),
    +                           new Timeout(askTimeout));
    +                   
    +                   try {
    +                           Await.result(response, askTimeout);
    +                   } catch(final Exception e) {
    +                           throw new RuntimeException("Killing topology " 
+ name + " with Flink job ID "
    +                                   + this.topologyNameToJobId.get(name) + 
" failed.", e);
    +                   }
    +                   
    +                   this.topologyNameToJobId.remove(name);
    +                   this.topologies.remove(name);
    +           } catch(final IOException e) {
    +                   throw new RuntimeException("Could not connect to Flink 
JobManager with address " + this.jobManagerHost
    +                           + ":" + this.jobManagerPort, e);
    +           }
    +   }
    +   
    +   // not sure if the following methods can be supported or not...
    +   
    +   // public void activate(final String name) throws NotAliveException {}
    +   
    +   // public void deactivate(final String name) throws NotAliveException {}
    +   
    +   // public void rebalance(final String name, final RebalanceOptions 
options) throws NotAliveException,
    +   // InvalidTopologyException {}
    +   
    +   // public String beginFileUpload() {
    +   // return null;
    +   // }
    +   
    +   // public void uploadChunk(final String location, final ByteBuffer 
chunk) {}
    +   
    +   // public void finishFileUpload(final String location) {}
    +   
    +   // public String beginFileDownload(final String file) {
    +   // return null;
    +   // }
    +   
    +   // public ByteBuffer downloadChunk(final String id) {
    +   // return null;
    +   // }
    +   
    +   // public void send_downloadChunk(final String id) {}
    +   
    +   // public String getNimbusConf() {
    +   // return null;
    +   // }
    --- End diff --
    
    I would like to keep it or to change it to throw 
UnsupportedOperationException(). This ensures, that the user is aware what 
operations are supported (or not supported) by the compability layer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to