http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/Constants.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java deleted file mode 100644 index 35c252f..0000000 --- a/storm-core/src/jvm/backtype/storm/Constants.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import backtype.storm.coordination.CoordinatedBolt; -import clojure.lang.RT; - - -public class Constants { - public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; - - public static final long SYSTEM_TASK_ID = -1; - public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]"); - public static final String SYSTEM_COMPONENT_ID = "__system"; - public static final String SYSTEM_TICK_STREAM_ID = "__tick"; - public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; - public static final String METRICS_STREAM_ID = "__metrics"; - public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; - public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials"; -} -
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ICredentialsListener.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java deleted file mode 100644 index 1a7bc1b..0000000 --- a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm; - -import java.util.Map; - -/** - * Allows a bolt or a spout to be informed when the credentials of the topology have changed. - */ -public interface ICredentialsListener { - /** - * Called when the credentials of a topology have changed. - * @param credentials the new credentials, could be null. - */ - public void setCredentials(Map<String,String> credentials); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ILocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/ILocalCluster.java b/storm-core/src/jvm/backtype/storm/ILocalCluster.java deleted file mode 100644 index 7d5aa35..0000000 --- a/storm-core/src/jvm/backtype/storm/ILocalCluster.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.KillOptions; -import backtype.storm.generated.SubmitOptions; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.RebalanceOptions; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.TopologyInfo; -import backtype.storm.generated.Credentials; - -import java.util.Map; - - -public interface ILocalCluster { - void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException; - void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException; - void uploadNewCredentials(String topologyName, Credentials creds); - void killTopology(String topologyName) throws NotAliveException; - void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException; - void activate(String topologyName) throws NotAliveException; - void deactivate(String topologyName) throws NotAliveException; - void rebalance(String name, RebalanceOptions options) throws NotAliveException; - void shutdown(); - String getTopologyConf(String id); - StormTopology getTopology(String id); - ClusterSummary getClusterInfo(); - TopologyInfo getTopologyInfo(String id); - Map getState(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ILocalDRPC.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/ILocalDRPC.java b/storm-core/src/jvm/backtype/storm/ILocalDRPC.java deleted file mode 100644 index e478dca..0000000 --- a/storm-core/src/jvm/backtype/storm/ILocalDRPC.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import backtype.storm.daemon.Shutdownable; -import backtype.storm.generated.DistributedRPC; -import backtype.storm.generated.DistributedRPCInvocations; - - -public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable { - public String getServiceId(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ISubmitterHook.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java deleted file mode 100644 index 331c88f..0000000 --- a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.TopologyInfo; - -import java.util.Map; - -/** - * if FQCN of an implementation of this class is specified by setting the config storm.topology.submission.notifier.plugin.class, - * that class's notify method will be invoked when a topology is successfully submitted via StormSubmitter class. - */ -public interface ISubmitterHook { - public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology topology) throws IllegalAccessException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/LogWriter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/LogWriter.java b/storm-core/src/jvm/backtype/storm/LogWriter.java deleted file mode 100644 index 849f5ca..0000000 --- a/storm-core/src/jvm/backtype/storm/LogWriter.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.InputStream; -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Launch a sub process and write files out to logs. - */ -public class LogWriter extends Thread { - private Logger logger; - private BufferedReader in; - - public LogWriter(InputStream in, Logger logger) { - this.in = new BufferedReader(new InputStreamReader(in)); - this.logger = logger; - } - - public void run() { - Logger logger = this.logger; - BufferedReader in = this.in; - String line; - try { - while ((line = in.readLine()) != null) { - logger.info(line); - } - } catch (IOException e) { - logger.error("Internal ERROR", e); - } finally { - try { - in.close(); - } catch (IOException e) { - logger.error("Internal ERROR", e); - } - } - } - - public void close() throws Exception { - this.join(); - } - - public static void main(String [] args) throws Exception { - ProcessBuilder pb = new ProcessBuilder(args); - Process p = pb.start(); - LogWriter err = null; - LogWriter in = null; - int ret = -1; - try { - Logger logger = LoggerFactory.getLogger("STDERR"); - err = new LogWriter(p.getErrorStream(), logger); - err.start(); - in = new LogWriter(p.getInputStream(), logger); - in.start(); - ret = p.waitFor(); - } finally { - if (err != null) err.close(); - if (in != null) in.close(); - } - System.exit(ret); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/StormSubmitter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java deleted file mode 100644 index 725b0b1..0000000 --- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.HashMap; -import java.util.Map; - -import backtype.storm.scheduler.resource.ResourceUtils; -import backtype.storm.validation.ConfigValidation; -import org.apache.commons.lang.StringUtils; -import org.apache.thrift.TException; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.security.auth.IAutoCredentials; -import backtype.storm.security.auth.AuthUtils; -import backtype.storm.generated.*; -import backtype.storm.utils.BufferFileInputStream; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - -/** - * Use this class to submit topologies to run on the Storm cluster. You should run your program - * with the "storm jar" command from the command-line, and then use this class to - * submit your topologies. - */ -public class StormSubmitter { - public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class); - - private static final int THRIFT_CHUNK_SIZE_BYTES = 307200; - - private static ILocalCluster localNimbus = null; - - private static String generateZookeeperDigestSecretPayload() { - return Utils.secureRandomLong() + ":" + Utils.secureRandomLong(); - } - - public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+"); - - public static boolean validateZKDigestPayload(String payload) { - if (payload != null) { - Matcher m = zkDigestPattern.matcher(payload); - return m.matches(); - } - return false; - } - - @SuppressWarnings("unchecked") - public static Map prepareZookeeperAuthentication(Map conf) { - Map toRet = new HashMap(); - - // Is the topology ZooKeeper authentication configuration unset? - if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) || - conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null || - ! validateZKDigestPayload((String) - conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) { - - String secretPayload = generateZookeeperDigestSecretPayload(); - toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload); - LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload); - } - - // This should always be set to digest. - toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest"); - - return toRet; - } - - private static Map<String,String> populateCredentials(Map conf, Map<String, String> creds) { - Map<String,String> ret = new HashMap<>(); - for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) { - LOG.info("Running "+autoCred); - autoCred.populateCredentials(ret); - } - if (creds != null) { - ret.putAll(creds); - } - return ret; - } - - /** - * Push a new set of credentials to the running topology. - * @param name the name of the topology to push credentials to. - * @param stormConf the topology-specific configuration, if desired. See {@link Config}. - * @param credentials the credentials to push. - * @throws AuthorizationException if you are not authorized ot push credentials. - * @throws NotAliveException if the topology is not alive - * @throws InvalidTopologyException if any other error happens - */ - public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials) - throws AuthorizationException, NotAliveException, InvalidTopologyException { - stormConf = new HashMap(stormConf); - stormConf.putAll(Utils.readCommandLineOpts()); - Map conf = Utils.readStormConfig(); - conf.putAll(stormConf); - Map<String,String> fullCreds = populateCredentials(conf, credentials); - if (fullCreds.isEmpty()) { - LOG.warn("No credentials were found to push to " + name); - return; - } - try { - if(localNimbus!=null) { - LOG.info("Pushing Credentials to topology " + name + " in local mode"); - localNimbus.uploadNewCredentials(name, new Credentials(fullCreds)); - } else { - NimbusClient client = NimbusClient.getConfiguredClient(conf); - try { - LOG.info("Uploading new credentials to " + name); - client.getClient().uploadNewCredentials(name, new Credentials(fullCreds)); - } finally { - client.close(); - } - } - LOG.info("Finished submitting topology: " + name); - } catch(TException e) { - throw new RuntimeException(e); - } - } - - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - * @throws AuthorizationException if authorization is failed - */ - public static void submitTopology(String name, Map stormConf, StormTopology topology) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - submitTopology(name, stormConf, topology, null, null); - } - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - * @throws AuthorizationException if authorization is failed - */ - public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - submitTopology(name, stormConf, topology, opts, null); - } - - /** - * Submits a topology to run on the cluster as a particular user. A topology runs forever or until - * explicitly killed. - * - * @param name - * @param stormConf - * @param topology - * @param opts - * @param progressListener - * @param asUser The user as which this topology should be submitted. - * @throws AlreadyAliveException - * @throws InvalidTopologyException - * @throws AuthorizationException - * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs - */ - public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException { - if(!Utils.isValidConf(stormConf)) { - throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); - } - stormConf = new HashMap(stormConf); - stormConf.putAll(Utils.readCommandLineOpts()); - Map conf = Utils.readStormConfig(); - conf.putAll(stormConf); - stormConf.putAll(prepareZookeeperAuthentication(conf)); - - validateConfs(conf, topology); - - Map<String,String> passedCreds = new HashMap<>(); - if (opts != null) { - Credentials tmpCreds = opts.get_creds(); - if (tmpCreds != null) { - passedCreds = tmpCreds.get_creds(); - } - } - Map<String,String> fullCreds = populateCredentials(conf, passedCreds); - if (!fullCreds.isEmpty()) { - if (opts == null) { - opts = new SubmitOptions(TopologyInitialStatus.ACTIVE); - } - opts.set_creds(new Credentials(fullCreds)); - } - try { - if(localNimbus!=null) { - LOG.info("Submitting topology " + name + " in local mode"); - if(opts!=null) { - localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts); - } else { - // this is for backwards compatibility - localNimbus.submitTopology(name, stormConf, topology); - } - } else { - String serConf = JSONValue.toJSONString(stormConf); - NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); - if(topologyNameExists(conf, name, asUser)) { - throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); - } - String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser); - try { - LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); - if(opts!=null) { - client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts); - } else { - // this is for backwards compatibility - client.getClient().submitTopology(name, jar, serConf, topology); - } - } catch(InvalidTopologyException e) { - LOG.warn("Topology submission exception: "+e.get_msg()); - throw e; - } catch(AlreadyAliveException e) { - LOG.warn("Topology already alive exception", e); - throw e; - } finally { - client.close(); - } - } - LOG.info("Finished submitting topology: " + name); - } catch(TException e) { - throw new RuntimeException(e); - } - invokeSubmitterHook(name, asUser, conf, topology); - - } - - private static void invokeSubmitterHook(String name, String asUser, Map stormConf, StormTopology topology) { - try { - if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) { - ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString()).newInstance(); - TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf); - submitterHook.notify(topologyInfo, stormConf, topology); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology - * @param progressListener to track the progress of the jar upload process - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - * @throws AuthorizationException if authorization is failed - */ - @SuppressWarnings("unchecked") - public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, - ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - submitTopologyAs(name, stormConf, topology, opts, progressListener, null); - } - - /** - * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - * @throws AuthorizationException if authorization is failed - */ - - public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - submitTopologyWithProgressBar(name, stormConf, topology, null); - } - - /** - * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - * @throws AuthorizationException if authorization is failed - */ - - public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - // show a progress bar so we know we're not stuck (especially on slow connections) - submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() { - @Override - public void onStart(String srcFile, String targetFile, long totalBytes) { - System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); - } - - @Override - public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) { - int length = 50; - int p = (int)((length * bytesUploaded) / totalBytes); - String progress = StringUtils.repeat("=", p); - String todo = StringUtils.repeat(" ", length - p); - - System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes); - } - - @Override - public void onCompleted(String srcFile, String targetFile, long totalBytes) { - System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); - } - }); - } - - private static boolean topologyNameExists(Map conf, String name, String asUser) { - NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); - try { - ClusterSummary summary = client.getClient().getClusterInfo(); - for(TopologySummary s : summary.get_topologies()) { - if(s.get_name().equals(name)) { - return true; - } - } - return false; - - } catch(Exception e) { - throw new RuntimeException(e); - } finally { - client.close(); - } - } - - private static String submitJar(Map conf, ProgressListener listener) { - return submitJar(conf, System.getProperty("storm.jar"), listener); - } - - /** - * Submit jar file - * @param conf the topology-specific configuration. See {@link Config}. - * @param localJar file path of the jar file to submit - * @return the remote location of the submitted jar - */ - public static String submitJar(Map conf, String localJar) { - return submitJar(conf, localJar, null); - } - - - public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) { - if (localJar == null) { - throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); - } - - NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); - try { - String uploadLocation = client.getClient().beginFileUpload(); - LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); - BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES); - - long totalSize = new File(localJar).length(); - if (listener != null) { - listener.onStart(localJar, uploadLocation, totalSize); - } - - long bytesUploaded = 0; - while(true) { - byte[] toSubmit = is.read(); - bytesUploaded += toSubmit.length; - if (listener != null) { - listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize); - } - - if(toSubmit.length==0) break; - client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); - } - client.getClient().finishFileUpload(uploadLocation); - - if (listener != null) { - listener.onCompleted(localJar, uploadLocation, totalSize); - } - - LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); - return uploadLocation; - } catch(Exception e) { - throw new RuntimeException(e); - } finally { - client.close(); - } - } - - /** - * Submit jar file - * @param conf the topology-specific configuration. See {@link Config}. - * @param localJar file path of the jar file to submit - * @param listener progress listener to track the jar file upload - * @return the remote location of the submitted jar - */ - public static String submitJar(Map conf, String localJar, ProgressListener listener) { - return submitJarAs(conf,localJar, listener, null); - } - - /** - * Interface use to track progress of file upload - */ - public interface ProgressListener { - /** - * called before file is uploaded - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param totalBytes - total number of bytes of the file - */ - public void onStart(String srcFile, String targetFile, long totalBytes); - - /** - * called whenever a chunk of bytes is uploaded - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param bytesUploaded - number of bytes transferred so far - * @param totalBytes - total number of bytes of the file - */ - public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes); - - /** - * called when the file is uploaded - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param totalBytes - total number of bytes of the file - */ - public void onCompleted(String srcFile, String targetFile, long totalBytes); - } - - private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { - ConfigValidation.validateFields(stormConf); - validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology); - } - - private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map stormConf, StormTopology topology) { - double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf); - Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); - if(topologyWorkerMaxHeapSize < largestMemReq) { - throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" - +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " - + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount"); - } - } - - private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) { - double largestMemoryOperator = 0.0; - for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) { - double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB) - + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); - if(memoryRequirement > largestMemoryOperator) { - largestMemoryOperator = memoryRequirement; - } - } - for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) { - double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB) - + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); - if(memoryRequirement > largestMemoryOperator) { - largestMemoryOperator = memoryRequirement; - } - } - return largestMemoryOperator; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java deleted file mode 100644 index f35b7a7..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * An output stream where all of the data is committed on close, - * or can be canceled with cancel. - */ -public abstract class AtomicOutputStream extends OutputStream { - /** - * Cancel all of the writes associated with this stream and close it. - */ - public abstract void cancel() throws IOException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java deleted file mode 100644 index 53cfa15..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -public class BlobKeySequenceInfo { - private String nimbusHostPort; - private String sequenceNumber; - - public void setNimbusHostPort(String nimbusHostPort) { - this.nimbusHostPort = nimbusHostPort; - } - - public void setSequenceNumber(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; - } - - public String getNimbusHostPort() { - return nimbusHostPort; - } - - public String getSequenceNumber() { - return sequenceNumber; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java deleted file mode 100644 index 16a408e..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java +++ /dev/null @@ -1,447 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.regex.Pattern; - -import javax.security.auth.Subject; - -import backtype.storm.nimbus.NimbusInfo; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.daemon.Shutdownable; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.ReadableBlobMeta; -import backtype.storm.generated.SettableBlobMeta; - -/** - * Provides a way to store blobs that can be downloaded. - * Blobs must be able to be uploaded and listed from Nimbus, - * and downloaded from the Supervisors. It is a key value based - * store. Key being a string and value being the blob data. - * - * ACL checking must take place against the provided subject. - * If the blob store does not support Security it must validate - * that all ACLs set are always WORLD, everything. - * - * The users can upload their blobs through the blob store command - * line. The command line also allows us to update and delete blobs. - * - * Modifying the replication factor only works for HdfsBlobStore - * as for the LocalFsBlobStore the replication is dependent on - * the number of Nimbodes available. - */ -public abstract class BlobStore implements Shutdownable { - private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class); - private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$"); - protected static final String BASE_BLOBS_DIR_NAME = "blobs"; - - /** - * Allows us to initialize the blob store - * @param conf The storm configuration - * @param baseDir The directory path to store the blobs - * @param nimbusInfo Contains the nimbus host, port and leadership information. - */ - public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo); - - /** - * Creates the blob. - * @param key Key for the blob. - * @param meta Metadata which contains the acls information - * @param who Is the subject creating the blob. - * @return AtomicOutputStream returns a stream into which the data - * can be written. - * @throws AuthorizationException - * @throws KeyAlreadyExistsException - */ - public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException; - - /** - * Updates the blob data. - * @param key Key for the blob. - * @param who Is the subject having the write privilege for the blob. - * @return AtomicOutputStream returns a stream into which the data - * can be written. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; - - /** - * Gets the current version of metadata for a blob - * to be viewed by the user or downloaded by the supervisor. - * @param key Key for the blob. - * @param who Is the subject having the read privilege for the blob. - * @return AtomicOutputStream returns a stream into which the data - * can be written. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; - - /** - * Sets the metadata with renewed acls for the blob. - * @param key Key for the blob. - * @param meta Metadata which contains the updated - * acls information. - * @param who Is the subject having the write privilege for the blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; - - /** - * Deletes the blob data and metadata. - * @param key Key for the blob. - * @param who Is the subject having write privilege for the blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; - - /** - * Gets the InputStream to read the blob details - * @param key Key for the blob. - * @param who Is the subject having the read privilege for the blob. - * @return InputStreamWithMeta has the additional - * file length and version information. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; - - /** - * Returns an iterator with all the list of - * keys currently available on the blob store. - * @return Iterator<String> - */ - public abstract Iterator<String> listKeys(); - - /** - * Gets the replication factor of the blob. - * @param key Key for the blob. - * @param who Is the subject having the read privilege for the blob. - * @return BlobReplication object containing the - * replication factor for the blob. - * @throws Exception - */ - public abstract int getBlobReplication(String key, Subject who) throws Exception; - - /** - * Modifies the replication factor of the blob. - * @param key Key for the blob. - * @param replication The replication factor the - * blob has to be set. - * @param who Is the subject having the update privilege for the blob - * @return BlobReplication object containing the - * updated replication factor for the blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - * @throws IOException - */ - public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException; - - /** - * Filters keys based on the KeyFilter - * passed as the argument. - * @param filter KeyFilter - * @param <R> Type - * @return Set of filtered keys - */ - public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) { - Set<R> ret = new HashSet<R>(); - Iterator<String> keys = listKeys(); - while (keys.hasNext()) { - String key = keys.next(); - R filtered = filter.filter(key); - if (filtered != null) { - ret.add(filtered); - } - } - return ret; - } - - /** - * Validates key checking for potentially harmful patterns - * @param key Key for the blob. - */ - public static final void validateKey(String key) throws AuthorizationException { - if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) { - LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN); - throw new AuthorizationException(key+" does not appear to be a valid blob key"); - } - } - - /** - * Wrapper called to create the blob which contains - * the byte data - * @param key Key for the blob. - * @param data Byte data that needs to be uploaded. - * @param meta Metadata which contains the acls information - * @param who Is the subject creating the blob. - * @throws AuthorizationException - * @throws KeyAlreadyExistsException - * @throws IOException - */ - public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { - AtomicOutputStream out = null; - try { - out = createBlob(key, meta, who); - out.write(data); - out.close(); - out = null; - } finally { - if (out != null) { - out.cancel(); - } - } - } - - /** - * Wrapper called to create the blob which contains - * the byte data - * @param key Key for the blob. - * @param in InputStream from which the data is read to be - * written as a part of the blob. - * @param meta Metadata which contains the acls information - * @param who Is the subject creating the blob. - * @throws AuthorizationException - * @throws KeyAlreadyExistsException - * @throws IOException - */ - public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { - AtomicOutputStream out = null; - try { - out = createBlob(key, meta, who); - byte[] buffer = new byte[2048]; - int len = 0; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - out.close(); - } catch (AuthorizationException | IOException | RuntimeException e) { - if (out !=null) { - out.cancel(); - } - } finally { - in.close(); - } - } - - /** - * Reads the blob from the blob store - * and writes it into the output stream. - * @param key Key for the blob. - * @param out Output stream - * @param who Is the subject having read - * privilege for the blob. - * @throws IOException - * @throws KeyNotFoundException - * @throws AuthorizationException - */ - public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { - InputStreamWithMeta in = getBlob(key, who); - if (in == null) { - throw new IOException("Could not find " + key); - } - byte[] buffer = new byte[2048]; - int len = 0; - try{ - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - } finally { - in.close(); - out.flush(); - } - } - - /** - * Wrapper around readBlobTo which - * returns a ByteArray output stream. - * @param key Key for the blob. - * @param who Is the subject having - * the read privilege for the blob. - * @return ByteArrayOutputStream - * @throws IOException - * @throws KeyNotFoundException - * @throws AuthorizationException - */ - public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - readBlobTo(key, out, who); - byte[] bytes = out.toByteArray(); - out.close(); - return bytes; - } - - /** - * Output stream implementation used for reading the - * metadata and data information. - */ - protected class BlobStoreFileOutputStream extends AtomicOutputStream { - private BlobStoreFile part; - private OutputStream out; - - public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException { - this.part = part; - this.out = part.getOutputStream(); - } - - @Override - public void close() throws IOException { - try { - //close means commit - out.close(); - part.commit(); - } catch (IOException | RuntimeException e) { - cancel(); - throw e; - } - } - - @Override - public void cancel() throws IOException { - try { - out.close(); - } finally { - part.cancel(); - } - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte []b) throws IOException { - out.write(b); - } - - @Override - public void write(byte []b, int offset, int len) throws IOException { - out.write(b, offset, len); - } - } - - /** - * Input stream implementation used for writing - * both the metadata containing the acl information - * and the blob data. - */ - protected class BlobStoreFileInputStream extends InputStreamWithMeta { - private BlobStoreFile part; - private InputStream in; - - public BlobStoreFileInputStream(BlobStoreFile part) throws IOException { - this.part = part; - this.in = part.getInputStream(); - } - - @Override - public long getVersion() throws IOException { - return part.getModTime(); - } - - @Override - public int read() throws IOException { - return in.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return in.read(b, off, len); - } - - @Override - public int read(byte[] b) throws IOException { - return in.read(b); - } - - @Override - public int available() throws IOException { - return in.available(); - } - - @Override - public long getFileLength() throws IOException { - return part.getFileLength(); - } - } - - /** - * Blob store implements its own version of iterator - * to list the blobs - */ - public static class KeyTranslationIterator implements Iterator<String> { - private Iterator<String> it = null; - private String next = null; - private String prefix = null; - - public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException { - this.it = it; - this.prefix = prefix; - primeNext(); - } - - private void primeNext() { - next = null; - while (it.hasNext()) { - String tmp = it.next(); - if (tmp.startsWith(prefix)) { - next = tmp.substring(prefix.length()); - return; - } - } - } - - @Override - public boolean hasNext() { - return next != null; - } - - @Override - public String next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - String current = next; - primeNext(); - return current; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Delete Not Supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java deleted file mode 100644 index c0c4e5c..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java +++ /dev/null @@ -1,399 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.Config; -import backtype.storm.generated.AccessControl; -import backtype.storm.generated.AccessControlType; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.SettableBlobMeta; -import backtype.storm.security.auth.AuthUtils; -import backtype.storm.security.auth.IPrincipalToLocal; -import backtype.storm.security.auth.NimbusPrincipal; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.Subject; -import java.security.Principal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Provides common handling of acls for Blobstores. - * Also contains some static utility functions related to Blobstores. - */ -public class BlobStoreAclHandler { - public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class); - private final IPrincipalToLocal _ptol; - - public static final int READ = 0x01; - public static final int WRITE = 0x02; - public static final int ADMIN = 0x04; - public static final List<AccessControl> WORLD_EVERYTHING = - Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN)); - public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>(); - private Set<String> _supervisors; - private Set<String> _admins; - - public BlobStoreAclHandler(Map conf) { - _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf); - _supervisors = new HashSet<String>(); - _admins = new HashSet<String>(); - if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) { - _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS)); - } - if (conf.containsKey(Config.NIMBUS_ADMINS)) { - _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS)); - } - } - - private static AccessControlType parseACLType(String type) { - if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) { - return AccessControlType.OTHER; - } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) { - return AccessControlType.USER; - } - throw new IllegalArgumentException(type+" is not a valid access control type"); - } - - private static int parseAccess(String access) { - int ret = 0; - for (char c: access.toCharArray()) { - if ('r' == c) { - ret = ret | READ; - } else if ('w' == c) { - ret = ret | WRITE; - } else if ('a' == c) { - ret = ret | ADMIN; - } else if ('-' == c) { - //ignored - } else { - throw new IllegalArgumentException(""); - } - } - return ret; - } - - public static AccessControl parseAccessControl(String str) { - String[] parts = str.split(":"); - String type = "other"; - String name = ""; - String access = "-"; - if (parts.length > 3) { - throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value"); - } else if (parts.length == 1) { - type = "other"; - name = ""; - access = parts[0]; - } else if (parts.length == 2) { - type = "user"; - name = parts[0]; - access = parts[1]; - } else if (parts.length == 3) { - type = parts[0]; - name = parts[1]; - access = parts[2]; - } - AccessControl ret = new AccessControl(); - ret.set_type(parseACLType(type)); - ret.set_name(name); - ret.set_access(parseAccess(access)); - return ret; - } - - private static String accessToString(int access) { - StringBuilder ret = new StringBuilder(); - ret.append(((access & READ) > 0) ? "r" : "-"); - ret.append(((access & WRITE) > 0) ? "w" : "-"); - ret.append(((access & ADMIN) > 0) ? "a" : "-"); - return ret.toString(); - } - - public static String accessControlToString(AccessControl ac) { - StringBuilder ret = new StringBuilder(); - switch(ac.get_type()) { - case OTHER: - ret.append("o"); - break; - case USER: - ret.append("u"); - break; - default: - throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means "); - } - ret.append(":"); - if (ac.is_set_name()) { - ret.append(ac.get_name()); - } - ret.append(":"); - ret.append(accessToString(ac.get_access())); - return ret.toString(); - } - - public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException { - Set<String> aclUsers = new HashSet<>(); - List<String> duplicateUsers = new ArrayList<>(); - for (AccessControl acl : acls) { - String aclUser = acl.get_name(); - if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) { - LOG.error("'{}' user can't appear more than once in the ACLs", aclUser); - duplicateUsers.add(aclUser); - } - } - if (duplicateUsers.size() > 0) { - String errorMessage = "user " + Arrays.toString(duplicateUsers.toArray()) - + " can't appear more than once in the ACLs for key [" + key +"]."; - throw new AuthorizationException(errorMessage); - } - } - - private Set<String> constructUserFromPrincipals(Subject who) { - Set<String> user = new HashSet<String>(); - if (who != null) { - for (Principal p : who.getPrincipals()) { - user.add(_ptol.toLocal(p)); - } - } - return user; - } - - private boolean isAdmin(Subject who) { - Set<String> user = constructUserFromPrincipals(who); - for (String u : user) { - if (_admins.contains(u)) { - return true; - } - } - return false; - } - - private boolean isReadOperation(int operation) { - if (operation == 1) { - return true; - } - return false; - } - - private boolean isSupervisor(Subject who, int operation) { - Set<String> user = constructUserFromPrincipals(who); - if (isReadOperation(operation)) { - for (String u : user) { - if (_supervisors.contains(u)) { - return true; - } - } - } - return false; - } - - private boolean isNimbus(Subject who) { - Set<Principal> principals; - boolean isNimbusInstance = false; - if (who != null) { - principals = who.getPrincipals(); - for (Principal principal : principals) { - if (principal instanceof NimbusPrincipal) { - isNimbusInstance = true; - } - } - } - return isNimbusInstance; - } - - public boolean checkForValidUsers(Subject who, int mask) { - return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask); - } - - /** - * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN - */ - public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException { - hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key); - } - - /** - * Validates if the user has any of the permissions - * mentioned in the mask. - * @param acl ACL for the key. - * @param mask mask holds the cumulative value of - * READ = 1, WRITE = 2 or ADMIN = 4 permissions. - * mask = 1 implies READ privilege. - * mask = 5 implies READ and ADMIN privileges. - * @param who Is the user against whom the permissions - * are validated for a key using the ACL and the mask. - * @param key Key used to identify the blob. - * @throws AuthorizationException - */ - public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException { - Set<String> user = constructUserFromPrincipals(who); - LOG.debug("user {}", user); - if (checkForValidUsers(who, mask)) { - return; - } - for (AccessControl ac : acl) { - int allowed = getAllowed(ac, user); - LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key); - if ((allowed & mask) > 0) { - return; - } - } - throw new AuthorizationException( - user + " does not have access to " + key); - } - - /** - * Validates if the user has at least the set of permissions - * mentioned in the mask. - * @param acl ACL for the key. - * @param mask mask holds the cumulative value of - * READ = 1, WRITE = 2 or ADMIN = 4 permissions. - * mask = 1 implies READ privilege. - * mask = 5 implies READ and ADMIN privileges. - * @param who Is the user against whom the permissions - * are validated for a key using the ACL and the mask. - * @param key Key used to identify the blob. - * @throws AuthorizationException - */ - public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException { - Set<String> user = constructUserFromPrincipals(who); - LOG.debug("user {}", user); - if (checkForValidUsers(who, mask)) { - return; - } - for (AccessControl ac : acl) { - int allowed = getAllowed(ac, user); - mask = ~allowed & mask; - LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key); - } - if (mask == 0) { - return; - } - throw new AuthorizationException( - user + " does not have " + namedPerms(mask) + " access to " + key); - } - - public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) { - meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask)); - } - - private String namedPerms(int mask) { - StringBuilder b = new StringBuilder(); - b.append("["); - if ((mask & READ) > 0) { - b.append("READ "); - } - if ((mask & WRITE) > 0) { - b.append("WRITE "); - } - if ((mask & ADMIN) > 0) { - b.append("ADMIN "); - } - b.append("]"); - return b.toString(); - } - - private int getAllowed(AccessControl ac, Set<String> users) { - switch (ac.get_type()) { - case OTHER: - return ac.get_access(); - case USER: - if (users.contains(ac.get_name())) { - return ac.get_access(); - } - return 0; - default: - return 0; - } - } - - private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) { - List<AccessControl> resultAcl = new ArrayList<AccessControl>(); - for (AccessControl control : accessControls) { - if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) { - LOG.debug("Removing invalid blobstore world ACL " + - BlobStoreAclHandler.accessControlToString(control)); - continue; - } - resultAcl.add(control); - } - return resultAcl; - } - - private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who, - int opMask) { - List<AccessControl> cleanAcls = removeBadACLs(acls); - Set<String> userNames = getUserNamesFromSubject(who); - for (String user : userNames) { - fixACLsForUser(cleanAcls, user, opMask); - } - if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) { - cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls); - if (!acls.isEmpty()) - LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key); - } - return cleanAcls; - } - - private boolean worldEverything(List<AccessControl> acls) { - boolean isWorldEverything = false; - for (AccessControl acl : acls) { - if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) { - isWorldEverything = true; - break; - } - } - return isWorldEverything; - } - - private void fixACLsForUser(List<AccessControl> acls, String user, int mask) { - boolean foundUserACL = false; - for (AccessControl control : acls) { - if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) { - int currentAccess = control.get_access(); - if ((currentAccess & mask) != mask) { - control.set_access(currentAccess | mask); - } - foundUserACL = true; - break; - } - } - if (!foundUserACL) { - AccessControl userACL = new AccessControl(); - userACL.set_type(AccessControlType.USER); - userACL.set_name(user); - userACL.set_access(mask); - acls.add(userACL); - } - } - - private Set<String> getUserNamesFromSubject(Subject who) { - Set<String> user = new HashSet<String>(); - if (who != null) { - for(Principal p: who.getPrincipals()) { - user.add(_ptol.toLocal(p)); - } - } - return user; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java deleted file mode 100644 index 22ccf97..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.generated.SettableBlobMeta; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.regex.Pattern; - -/** - * Provides an base implementation for creating a blobstore based on file backed storage. - */ -public abstract class BlobStoreFile { - public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class); - - protected static final String TMP_EXT = ".tmp"; - protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$"); - protected static final String BLOBSTORE_DATA_FILE = "data"; - - public abstract void delete() throws IOException; - public abstract String getKey(); - public abstract boolean isTmp(); - public abstract void setMetadata(SettableBlobMeta meta); - public abstract SettableBlobMeta getMetadata(); - public abstract long getModTime() throws IOException; - public abstract InputStream getInputStream() throws IOException; - public abstract OutputStream getOutputStream() throws IOException; - public abstract void commit() throws IOException; - public abstract void cancel() throws IOException; - public abstract long getFileLength() throws IOException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java deleted file mode 100644 index 97fb262..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.Config; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.generated.ReadableBlobMeta; -import backtype.storm.nimbus.NimbusInfo; -import backtype.storm.security.auth.NimbusPrincipal; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import backtype.storm.utils.ZookeeperAuthInfo; -import org.apache.curator.framework.CuratorFramework; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.Subject; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class BlobStoreUtils { - private static final String BLOBSTORE_SUBTREE="/blobstore"; - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class); - - public static CuratorFramework createZKClient(Map conf) { - List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); - CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo); - zkClient.start(); - return zkClient; - } - - public static Subject getNimbusSubject() { - Subject subject = new Subject(); - subject.getPrincipals().add(new NimbusPrincipal()); - return subject; - } - - // Normalize state - public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(String nimbusSeqNumberInfo) { - BlobKeySequenceInfo keySequenceInfo = new BlobKeySequenceInfo(); - int lastIndex = nimbusSeqNumberInfo.lastIndexOf("-"); - keySequenceInfo.setNimbusHostPort(nimbusSeqNumberInfo.substring(0, lastIndex)); - keySequenceInfo.setSequenceNumber(nimbusSeqNumberInfo.substring(lastIndex + 1)); - return keySequenceInfo; - } - - // Check for latest sequence number of a key inside zookeeper and return nimbodes containing the latest sequence number - public static Set<NimbusInfo> getNimbodesWithLatestSequenceNumberOfBlob(CuratorFramework zkClient, String key) throws Exception { - List<String> stateInfoList = zkClient.getChildren().forPath("/blobstore/" + key); - Set<NimbusInfo> nimbusInfoSet = new HashSet<NimbusInfo>(); - int latestSeqNumber = getLatestSequenceNumber(stateInfoList); - LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} version {}", stateInfoList, latestSeqNumber); - // Get the nimbodes with the latest version - for(String state : stateInfoList) { - BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state); - if (latestSeqNumber == Integer.parseInt(sequenceInfo.getSequenceNumber())) { - nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort())); - } - } - LOG.debug("nimbusInfoList {}", nimbusInfoSet); - return nimbusInfoSet; - } - - // Get sequence number details from latest sequence number of the blob - public static int getLatestSequenceNumber(List<String> stateInfoList) { - int seqNumber = 0; - // Get latest sequence number of the blob present in the zookeeper --> possible to refactor this piece of code - for (String state : stateInfoList) { - BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state); - int currentSeqNumber = Integer.parseInt(sequenceInfo.getSequenceNumber()); - if (seqNumber < currentSeqNumber) { - seqNumber = currentSeqNumber; - LOG.debug("Sequence Info {}", seqNumber); - } - } - LOG.debug("Latest Sequence Number {}", seqNumber); - return seqNumber; - } - - // Download missing blobs from potential nimbodes - public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) - throws TTransportException { - NimbusClient client; - ReadableBlobMeta rbm; - ClientBlobStore remoteBlobStore; - InputStreamWithMeta in; - boolean isSuccess = false; - LOG.debug("Download blob NimbusInfos {}", nimbusInfos); - for (NimbusInfo nimbusInfo : nimbusInfos) { - if(isSuccess) { - break; - } - try { - client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null); - rbm = client.getClient().getBlobMeta(key); - remoteBlobStore = new NimbusBlobStore(); - remoteBlobStore.setClient(conf, client); - in = remoteBlobStore.getBlob(key); - blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject()); - // if key already exists while creating the blob else update it - Iterator<String> keyIterator = blobStore.listKeys(); - while (keyIterator.hasNext()) { - if (keyIterator.next().equals(key)) { - LOG.debug("Success creating key, {}", key); - isSuccess = true; - break; - } - } - } catch (IOException | AuthorizationException exception) { - throw new RuntimeException(exception); - } catch (KeyAlreadyExistsException kae) { - LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae); - } catch (KeyNotFoundException knf) { - // Catching and logging KeyNotFoundException because, if - // there is a subsequent update and delete, the non-leader - // nimbodes might throw an exception. - LOG.info("KeyNotFoundException Key: {} {}", key, knf); - } catch (Exception exp) { - // Logging an exception while client is connecting - LOG.error("Exception {}", exp); - } - } - - if (!isSuccess) { - LOG.error("Could not download blob with key" + key); - } - return isSuccess; - } - - // Download updated blobs from potential nimbodes - public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) - throws TTransportException { - NimbusClient client; - ClientBlobStore remoteBlobStore; - InputStreamWithMeta in; - AtomicOutputStream out; - boolean isSuccess = false; - LOG.debug("Download blob NimbusInfos {}", nimbusInfos); - for (NimbusInfo nimbusInfo : nimbusInfos) { - if (isSuccess) { - break; - } - try { - client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null); - remoteBlobStore = new NimbusBlobStore(); - remoteBlobStore.setClient(conf, client); - in = remoteBlobStore.getBlob(key); - out = blobStore.updateBlob(key, getNimbusSubject()); - byte[] buffer = new byte[2048]; - int len = 0; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - if (out != null) { - out.close(); - } - isSuccess = true; - } catch (IOException | AuthorizationException exception) { - throw new RuntimeException(exception); - } catch (KeyNotFoundException knf) { - // Catching and logging KeyNotFoundException because, if - // there is a subsequent update and delete, the non-leader - // nimbodes might throw an exception. - LOG.info("KeyNotFoundException {}", knf); - } catch (Exception exp) { - // Logging an exception while client is connecting - LOG.error("Exception {}", exp); - } - } - - if (!isSuccess) { - LOG.error("Could not update the blob with key" + key); - } - return isSuccess; - } - - // Get the list of keys from blobstore - public static List<String> getKeyListFromBlobStore(BlobStore blobStore) throws Exception { - Iterator<String> keys = blobStore.listKeys(); - List<String> keyList = new ArrayList<String>(); - if (keys != null) { - while (keys.hasNext()) { - keyList.add(keys.next()); - } - } - LOG.debug("KeyList from blobstore {}", keyList); - return keyList; - } - - public static void createStateInZookeeper(Map conf, String key, NimbusInfo nimbusInfo) throws TTransportException { - ClientBlobStore cb = new NimbusBlobStore(); - cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)); - cb.createStateInZookeeper(key); - } - - public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) { - try { - // Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore - // checks for updating the correct version of the blob as a part of nimbus ha before performing any - // operation on it, there is a neccessity to stub several test cases to ignore this method. It is a valid - // trade off to return if nimbusDetails which include the details of the current nimbus host port data are - // not initialized as a part of the test. Moreover, this applies to only local blobstore when used along with - // nimbus ha. - if (nimbusDetails == null) { - return; - } - boolean isListContainsCurrentNimbusInfo = false; - List<String> stateInfo; - if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { - return; - } - stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); - LOG.debug("StateInfo for update {}", stateInfo); - Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); - - for (NimbusInfo nimbusInfo:nimbusInfoList) { - if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) { - isListContainsCurrentNimbusInfo = true; - break; - } - } - - if (!isListContainsCurrentNimbusInfo && downloadUpdatedBlob(conf, blobStore, key, nimbusInfoList)) { - LOG.debug("Updating state inside zookeeper for an update"); - createStateInZookeeper(conf, key, nimbusDetails); - } - } catch (Exception exp) { - throw new RuntimeException(exp); - } - } - -}
